Giter Site home page Giter Site logo

pydra's Introduction

GitHub Actions CI CircleCI codecov

pydra logo

Pydra: Dataflow Engine

A simple dataflow engine with scalable semantics.

Pydra is a rewrite of the Nipype engine with mapping and joining as first-class operations. It forms the core of the Nipype 2.0 ecosystem.

The goal of pydra is to provide a lightweight Python dataflow engine for DAG construction, manipulation, and distributed execution.

Feature list:

  1. Python 3.7+ using type annotation and attrs
  2. Composable dataflows with simple node semantics. A dataflow can be a node of another dataflow.
  3. splitter and combiner provides many ways of compressing complex loop semantics
  4. Cached execution with support for a global cache across dataflows and users
  5. Distributed execution, presently via ConcurrentFutures, SLURM, and Dask (this is an experimental implementation with limited testing)

API Documentation

Learn more about Pydra

Binder

Please note that mybinder times out after an hour.

Installation

pip install pydra

Note that installation fails with older versions of pip on Windows. Upgrade pip before installing:

pip install –upgrade pip
pip install pydra

Developer installation

Pydra requires Python 3.7+. To install in developer mode:

git clone [email protected]:nipype/pydra.git
cd pydra
pip install -e ".[dev]"

In order to run pydra's test locally:

pytest -vs pydra

If you want to test execution with Dask:

git clone [email protected]:nipype/pydra.git
cd pydra
pip install -e ".[dask]"

It is also useful to install pre-commit:

pip install pre-commit
pre-commit

pydra's People

Contributors

ablachair avatar adi611 avatar basnijholt avatar chasejohnson3 avatar dafrose avatar dependabot[bot] avatar djarecka avatar effigies avatar ghisvail avatar jsmentch avatar kaczmarj avatar mgxd avatar montesmariana avatar nicolasgensollen avatar nicolocin avatar oesteban avatar peerherholz avatar pre-commit-ci[bot] avatar r03ert0 avatar rcali21 avatar satra avatar shotgunosine avatar tclose avatar yarikoptic avatar yibeichan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pydra's Issues

Node API and execution model

Trying to think through the execution model, I think a useful distinction will be between graph nodes and execution nodes. Graph nodes would be aware of the graph structure and their own state:

class Node:
    def __init__(self, runnable, inputs=None, split=None, combine=None):
        self._parents = []
        self._runnable = runnable
        self._inputs = inputs or {}
        self._state = State(self, split, combine)

    def add_parent(self, node):
        self._parents.append(node)

    @property
    def state(self):
        return self._state.resolve([parent.state for parent in self._parents])

   def get_execution_nodes(self):
        return [ExecutionNode(self._runnable, inputs={self._inputs, **state_inputs})
                for state_inputs in self.state.generate_inputs()]

    def run(self):
        for execnode in self.get_execution_nodes():
            if execnode.ready:
                execnode.run()

I would treat ExecutionNodes ephemerally. Their inputs should implement some kind of future/promise interface, and their outputs should be loadable from file, such that:

>>> execnode1 = node.get_execnodes()[0]
>>> execnode1.run()
>>> execnode1.results
{'out_a': <a>,
 'out_b': <b>}
>>> execnode2 = node.get_execnodes()[0]
>>> execnode2 is execnode1
False
>>> execnode2.inputs.in_a is execnode1.inputs.in_a
True
>>> execnode2.results
{'out_a': <a>,
 'out_b': <b>}
>>> os.remove_dirs(execnode2._results_path)
>>> execnode1.results
RuntimeError("Node not run")

This approach to Node leads to each node storing a partial State object that is only fully resolved at runtime, when all of its parents are known. If we have a sensible pairwise combination semantics for state, we can do the following:

class State:
    @classmethod
    def merge(cls, state_a, state_b, context):
        if state_a is state_b:
            return state_a
        if not state_b.axes:
            return state_a
        if not state_a.axes:
            return state_b
        # create a new state from two states, using current state's splits
        # to assist in reordering

    def resolve(self, states):
        if states == []:
            return self

        state = states[0]
        for tmp_state in states[1:] + [self]:
            state = self.merge(state, tmp_state, self._split)
        return state

It needs fleshing out, but the idea is that resolve will return the most recently changed State object, where changing means adding, removing, or reordering axes. Calling for inputs from the states will check to make sure that the shapes of inputs match, when needed.

I have some other thoughts as to how execution falls out of here, but I'll let those percolate for a bit. In the meantime, what do y'all think? Does this simplify/complicate how we're currently thinking of nodes? I do think it's a significantly different approach to state, so I don't want to charge ahead without feedback.

Context manager - Isolation/environment

Implement a context manager that provides isolation of inputs/execution environment. This context manager could in principle manage:

  • creating and checking for the appropriate environment (shell, container, remote)
  • moving/mounting inputs as necessary (and setting new paths appropriately)

potential future extension, merge pydra scheduler and plugins into a context manager.

Implement container execution

Related to #32

Minimal task support currently merged. should be extended to provide complete isolation. i should be able to run the pydra workflow itself in one container, and have it launch other tasks inside that container or in remote containers (through slurm, aws, dind, singularity, etc.,.)

cache propagation issue on OSX

test test_cache_propagation1 doesn't work on OSX, I believe this is a problem that OSX sets tempdir differently:
wf.cache_dir is PosixPath('/var/folders/32/823z586933zbm26chcmcqq1m0000gp/T/tmpzmou1eal')
and t1.cache_dir is
PosixPath('/private/var/folders/32/823z586933zbm26chcmcqq1m0000gp/T/tmpzmou1eal')

problems with requirements

I'm having issue with installing the package and the way we specify the requirements. I wasn't paying attention to this, I know that we used to have requirements.txt, but it has disappeared and now everything is in __about__. It was fine, but I'm doing something wrong in #76, where I've started exposing some objects to the user in the __init__ files. Suggestions will be appreciated!

Nipype 1 and 2

This is more for Nipype 2.0, but i'm sticking it here for now and for 0.1 release. We should push for this once the other 0.1 tasks are done, especially consolidation of the specs API.

  • Create a separate project for a nipype 1 to 2 workflow converter.
  • Export Nipype 1.0 interfaces to Pydra Task/Node specs

request: human node

possibly enable human manipulation in some steps.

hard to predict all consequences..;-)

CI: Test on OSX

Our PyPI classifiers say we support OSX, so we should add an/some entry/entries to Travis to do so.

Networkx-Dask DAG API?

@satra -- I've been working on the profiler (i.e. via network flow optimization using a combo of dask + networkx) and am wondering if you already have an API on hand for formatting networkx-based DAG's in dask-friendly format (see: https://docs.dask.org/en/latest/optimize.html#definitions and https://docs.dask.org/en/latest/custom-graphs.html)? If not I can/will write one?

Not sure what your thoughts are on DAG optimization using dask, but it offers considerable flexibility: https://docs.dask.org/en/latest/optimize.html#customizing-optimization. Curious to get your thoughts...

@dPys

workflow_as_node does not update the input (if it was run already)

If I create a wfnd first and later use within wf that has different input it works properly only if wfnd was not run before wf. See the tests:

@pytest.mark.parametrize("plugin", Plugins)
def test_wfasnd_wfndupdate(plugin):
    """ workflow as a node
        workflow-node with one task and no splitter
        wfasnode input is updated to use the main workflow input
    """

    wfnd = Workflow(name="wfnd", input_spec=["x"], x=2)
    wfnd.add(add2(name="add2", x=wfnd.lzin.x))
    wfnd.set_output([("out", wfnd.add2.lzout.out)])

    wf = Workflow(name="wf", input_spec=["x"], x=3)
    wfnd.inputs.x = wf.lzin.x
    wf.add(wfnd)
    wf.set_output([("out", wf.wfnd.lzout.out)])
    wf.plugin = plugin

    with Submitter(plugin=plugin) as sub:
        sub(wf)

    results = wf.result()
    assert results.output.out == 5
    assert wf.output_dir.exists()


@pytest.mark.xfail(reason="wfnd is not updating input for it's nodes")
@pytest.mark.parametrize("plugin", Plugins)
def test_wfasnd_wfndupdate_rerun(plugin):
    """ workflow as a node
        workflow-node with one task and no splitter
        wfasnode is run first and later is
        updated to use the main workflow input
    """

    wfnd = Workflow(name="wfnd", input_spec=["x"], x=2)
    wfnd.add(add2(name="add2", x=wfnd.lzin.x))
    wfnd.set_output([("out", wfnd.add2.lzout.out)])
    with Submitter(plugin=plugin) as sub:
        sub(wfnd)

     wf = Workflow(name="wf", input_spec=["x"], x=3)
    # trying to set before
    wfnd.inputs.x = wf.lzin.x
    wf.add(wfnd)
    # trying to set after add...
    wf.wfnd.inputs.x = wf.lzin.x
    wf.set_output([("out", wf.wfnd.lzout.out)])
    wf.plugin = plugin

    results = wf.result()
    assert results.output.out == 5
    assert wf.output_dir.exists()

Haven't debug too much, but it looks like in the second test the input wf.wfnd.inputs.x is not taken from the wf.inputs (it shows x=LF('wf', 'x')), so the wf.wfnd.add2 takes the previously set input, i.e. 2

Merge NodeBase and BaseTask

These should become one object, such that workflows can add Task objects directly. In pydra, a Task can have inputs for the underlying functionality but also for running the task being derived from the dataflow (e.g., environment specification).

all inputs to nodebase should be keyword args, any unmatched arguments will be assumed to be inputs for the underlying function/command.

Thoughts on reading the tutorials

These are comments prompted by working through the tutorials, and can be split out into focused issues as needed, but just wanted to start making notes.

The following is from working through intro_task_state, and skimming intro_workflow enough to write a couple example workflows to demonstrate a point below. I will pick up later.

Semantic

  1. task1.state.splitter == 'add_two.x' seems to be using the function name as the Task name. Does this cause problems in a workflow where I might use two of the same function-based Tasks? Realized that tasks in workflows are required to have an explicit name. Wrote this before I'd started on the workflow tutorial.
  2. When splitting over multiple dimensions, the results are aggregated in a 1D list. I would have assumed that it would be ND, with the first axis matching the first variable split over, and so on.
  3. When combining, the order of the combiner does not affect the order of the results. I would assume that .combine(["a", "b"]) would produce something ordered like [(a1, b1), (a2, b1), (a1, b2), (a2, b2)] while .combine(["b", "a"]) would produce [(a1, b1), (a1, b2), (a2, b1), (a2, b2)]. (Or possibly the reverse; I haven't thought it through carefully enough to be sure.)
  4. .combine("x").combine("n") seems like it should be equivalent to .combine(["x", "n"]). Probably an edge case, but I think making this (and multiple splits on a single task) doable should help clarify the semantics for users, as well as our own thinking. For instance, these should be equivalent (they don't actually run, but hopefully they're clear):
@pydra.mark.task
def power(a, b):
    return a ** b

@pydra.mark.task
def identity(x):
    return x

wf1 = pydra.Workflow(name="wf1", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf1.add(power(name="power", a=wf1.lzin.a, b=wf1.lzin.b).split(["a", "b"]).combine("a"))
wf1.add(identity(name="identity", x=wf1.power.lzout.out).combine("power.b"))
wf1.set_output({'out': wf1.identity.lzout.out})

wf2 = pydra.Workflow(name="wf2", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf2.add(power(name="power", a=wf2.lzin.a, b=wf2.lzin.b).split(["a", "b"]).combine("a").combine("b"))
wf2.set_output({'out': wf2.power.lzout.out})

wf3 = pydra.Workflow(name="wf3", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf3.add(power(name="power", a=wf3.lzin.a, b=wf3.lzin.b).split("a").split("b").combine(["a", "b"])
wf3.set_output({'out': wf3.power.lzout.out})

Usability

  1. State.__str__ would be nice to show something that actually describes the internal state, rather than just repr(State).
  2. Combiners must be strings or lists, currently. It's not clear why this is a requirement. Any sequence seems acceptable to me.
  3. nr_proc is pretty unintuitive.

Aesthetic

  1. Perhaps we can think of a more intuitive name from a user's perspective than splitter in Task().split(splitter=...). I think it's fine as an internal variable, so we don't need to do a massive search-and-replace, but splitter kind of grates on me.
    • Although parameters are usually given noun names, we could follow Pandas use a preposition like split(over=...) or split(by=...) which would read somewhat more like English. I'm not sure how this would feel to non-native speakers, though.
    • I can't think of a good noun at the moment.
    • We could also eliminate the need to think about the name at all by making it a positional-only parameter, so it would always be split("a") or split(["a", "b"]) or the like.

pydraboard

realtime monitoring of pydra execution status and messaging receiver.

Implement registered action hooks

Hooks will allow automations to kick in for specific events.

Possible enumerated locations for hooks:
task_init_pre
task_init_post
run_pre
run_post
run_task_pre
run_task_post
workflow_init_pre
workflow_init_post
workflow_run_pre
workflow_run_post

Hooks can be registered through the Task/Node API.

Mapping semantics

Suppose we have a function f(a, b) and a node:

nodeA = Node(f, mapper=['a', 'b'])
nodeA.inputs = {'a': ['foo1', 'foo2'], 'b': ['bar1', 'bar2', 'bar3']}

Various further steps may occur, but then we have a new function g(d, e) such that we wish to map over e, and iterate in lockstep with the inputs of `nodeA.a':

nodeC = Node(g, mapper=('nodeA.a', 'e'))
nodeC.inputs.e = ['baz1', 'baz2']

In this case, the nodeC instances that are downstream from nodeA.a == 'foo1' will receive e == 'baz1'. These will continue to map independently of nodeA.b.

This would be equivalent to a complete mapper:

nodeC = Node(g, mapper=[('nodeA.a', 'e'), 'nodeA.b'])
nodeC.inputs.e = ['baz1', 'baz2']

Whiteboard

20181009_102850

Remove hard-coded data paths

See, e.g.,

@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data")

Inputs = {"subject_id": "sub-01",
"output_spaces": ["fsaverage", "fsaverage5"],
"source_file": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/func_preproc_ses_test_task_fingerfootlips_wf/bold_t1_trans_wf/merge/vol0000_xform-00000_merged.nii",
"t1_preproc": "/fmriprep_test/output1/fmriprep/sub-01/anat/sub-01_T1w_preproc.nii.gz",
"t1_2_fsnative_forward_transform": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/anat_preproc_wf/surface_recon_wf/t1_2_fsnative_xfm/out.lta",
"subjects_dir": "/fmriprep_test/output1/freesurfer/"
}

We should use datalad or OSF to retrieve data to run tests on.

add sink functionality to base task class

  • What would you like changed/added and why?

It would be good to be able to send the data at the end of a run to a specific location, server, stream. this should also allow renaming outputs using a template, thus this feature could combine datasink and rename into a task property.

  • What would be the benefit? Does the change make something easier to use?

data could be streamed as each task is finished rather than waiting for an entire workflow to be run. this can enable triggering other services.

workers should be run as context managers

The relevant workers should be run as context managers so that they clean up properly on exit.

This could be done within the worker object source:

with multiprocess.Pool() as p:
    pass

with concurrent.futures.ProcessPoolExecutor() as p:
    pass

with dask.distributed.Client() as c:
    pass

or the worker objects could be made context-aware and run as context managers in Submitter.

with pydra.engine.workers.DaskWorker() as d:
    pass

combiner not working properly

from #119 example

import pydra

@pydra.mark.task
def power(a, b):
    return a ** b

@pydra.mark.task
def identity(x):
    return x

wf1 = pydra.Workflow(name="wf1", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf1.add(power(name="power", a=wf1.lzin.a, b=wf1.lzin.b).split(["a", "b"]).combine("a"))
wf1.add(identity(name="identity", x=wf1.power.lzout.out).combine("power.b"))
wf1.set_output({'out': wf1.identity.lzout.out})
wf1(plugin='cf')

this returns:
Result(output=Output(out=[[1, 1]]), runtime=None, errored=False)

should return (i believe)

Result(output=Output(out=[[1, 1],[4,8]]), runtime=None, errored=False)

missing dependencies for tests

For the Bug Report,
Include this information:

What version of Pydra are you using?

  • 0.0.1+298.g759ec5d
    What were you trying to do?
  • run tests
    What did you expect will happen?
  • tests to mostly pass
    What actually happened?
  • tests failed due to missing dependencies
    Can you replicate the behavior? If yes, how?
  • run make test in a fresh conda environment after having installed pydra with pip install --editable .

List the steps you performed that revealed the bug to you.
Include any code samples. Enclose them in triple back-ticks (```)
Like this:

conda create -n pydra python=3.7
conda activate pydra
cd /path/to/pydra
pip install --no-cache-dir --editable .[tests]
make test

Dependencies that are missing:

  • numpy
  • psutil
  • dask and distributed

Should the plugin dependencies be added to the 'tests' extra?

functions with multiple values in return statement

If default out for output is used (without defining output_names - will be in pr) and a function returns two values, only one value is saved. See:

@pydra.to_task
def add_vector(x1, y1, x2, y2):
    return (x1 + x2, y1 + y2)

task5 = add_vector(name="add_vect", 
                   x1=[10, 20], y1=[1, 2], x2=[10, 20, 30], y2=[10, 20, 30])
task5.split(splitter=[("x1", "y1"), ("x2", "y2")])
task5(plugin="cf")
task5.result()

messaging based counter for rerun

we should determine whether the correct tasks are rerun using messaging.

if we write a message saying "Loading precomputed results", then we can count the number of such things for any graph and determine if the rerun was successful at only recomputing the necessary elements and also respecting graph hierarchy.

result syntax

Should think about the syntax for results for node and workflow with and without mappers.
What should be the syntax when wf is a node?

This will be important when adding join.

default values for FunctionTask

We should think if this code should give pydra exception:

@pydra.to_task
def sum_a(a, b):
    return a + b
task = sum_a()
task()

CI: Test on Windows

As noted in #81 (comment), I would suggest that we try out Azure Pipelines instead of AppVeyor.

Perhaps a good first step is somebody with access to a Windows computer to try to run the tests and estimate how big a job it is going to be to get us to compatibility. If that's going to be the struggle, learning a new CI infrastructure might be too much to add on top of it.

Move to split / combine keywords for "mapping" subgraphs over inputs

In order to ease transition for existing nipype users, a minimal goal is to ensure that MapNode and iterables+JoinNode functionality persists, in that nodes that fan out from a mapping can be recombined in the same order to produce lists of output.

Discussion was wide-ranging and perhaps at times experiencing a significant gravitas shortfall, but if I can remember correctly, we had the following genuine proposals:

Fan-out Fan-in
map unmap
map collate
map isquash (i.e. squash by index)
map zap
split combine
splitapply combine

I'm sure I'm missing some. I would suggest we flesh out serious suggestions, with one comment per pair, and then have a quick round of voting, where heart (❤️) indicates your preference, thumbs-up (👍) indicates you would be happy enough with it, and thumbs-down (👎) indicates you strongly disapprove and will argue against it if it looks like we're settling on that one.

This is a separate discussion from value-based grouping or non-concatenation-based reductions, although you may want to weigh these options in terms of how easy they'll make naming such operations.

Edit: Please add any additional options you would like to vote on.

CI: Bump warnings up to errors in an allowed-failure mode

To stay on top of pending deprecations and to not become one of those libraries that causes all downstream packages to emit tons of warnings or fiddle with warnings filters, we should set some Travis jobs to turn warnings into errors. To allow this to be opportunistic, rather than a constant pain, we should mark these jobs as allow_failure.

It looks like Pytest has some mechanisms for modifying warning behavior, which will probably work better than the PYTHONWARNINGS="error" environment variable tried out in #84.

GitHub Actions

Are we planning to use GitHub actions? It seems our beta has been approved for the nipype organization.

UX: make task cache_dirs easier to identify

Moved from #100.

Currently, the cache directories for each task come in the form of <Class>_xxxxxxx. There is not an easy way for a user to track down a particular task's cache_dir, especially in the case of larger workflows.

We should consider making these more identifiable down the line.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.