Giter Site home page Giter Site logo

bionode / bionode-watermill Goto Github PK

View Code? Open in Web Editor NEW
37.0 37.0 11.0 695 KB

💧Bionode-Watermill: A (Not Yet Streaming) Workflow Engine

Home Page: https://bionode.gitbooks.io/bionode-watermill/content/

License: MIT License

JavaScript 95.44% HTML 4.56%
bioinformatics bionode nodejs pipeline tool

bionode-watermill's Introduction

bionode logo
bionode.io

bionode

Modular and universal bioinformatics

npm Travis Coveralls Dependencies npm Gitter

Install

You need to install the latest Node.JS first, please check nodejs.org or do the following:

# Ubuntu
sudo apt-get install npm
# Mac
brew install node
# Both
npm install -g n
n stable

To use bionode as a command line tool, you can install it globally with -g.

npm install bionode -g

Or, if you want to use it as a JavaScript library, you need to install it in your local project folder inside the node_modules directory by doing the same command without -g.

npm i bionode # 'i' can be used as shorcut to 'install'

Documentation

Check our documentation at doc.bionode.io

Modules list

For a complete list of bionode modules, please check the repositories with the "tool" tag

Contributing

We welcome all kinds of contributions at all levels of experience, please read the CONTRIBUTING.md to get started!

Communication channels

Don't be shy! Come talk to us 😃

Who's using Bionode?

For a list of some projects or institutions that we know of, check the USERS.md file. If you think you should be on that list or know who should, let us know! :D

Acknowledgements

We would like to thank all the people and institutions listed below!

bionode-watermill's People

Contributors

ayangromano avatar bmpvieira avatar thejmazz avatar tiagofilipe12 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bionode-watermill's Issues

Test Reentrancy

  • join(A, B, C), C fails, run again
  • join(A, B, C), A's params change, hash of output of A changes, so B should rerun (instead of finding A's output which matches pattern and then skipping B)

Trinity Pipeline

  1. rna sras (ants, flies, etc.)
  2. filter
  3. transcriptome assembly (trinity -> trinity.fasta)
  4. pull out longest transcript
  5. take sra reads that contributed to assembly, map them to assembly (e.g. blast)
  6. fastq that only contains reads that contributed to this longest transcript

Parallelize BAM chunks

  • parallelization with patches of bam
  • for example, split bam into 20 sections if 20 cores available
  • both options: concatenate vs. run analysis on regions

Example:

  • get whole genome reference, produce BAM from it
  • run, in parallel, a task on each chromosome
  • for each chromosome task, after its done do further analysis AND
  • after all chromosomes finish, do something that needs all of them (e.g. GWAS)

Prevent Interference with Task Streams

Task will return a readable, writable, or duplex stream. However, then the user, if they are making mischief, can interfere. Perhaps there is a use case for this, and it should just be advised to be avoided to not use stream methods on the streams that Task returns. Consider:

const config = {
  sraAccession: '2492428',
  referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCA_000988525.2_ASM98852v2/GCA_000988525.2_ASM98852v2_genomic.fna.gz'
}

// TODO take in an object stream to start off a bunch of these
const downloadReference = Task({
  input: config.referenceURL,
  output: new File(config.referenceURL.split('/').pop())
}, ({ input, output }) => request(input).pipe(fs.createWriteStream(output.value)) )

const s = downloadReference()
s.write('FOOBAR') // This gets written to the reference.gz file (and corrupts it)
s.on('finish', () => console.log('File written'))

Moreover something to keep in mind; probably not worthwhile finding a way to enforce this cannot happen instead of just advising to users to not do it unless they know what they are doing.

Simple usage tutorial

What do you think if we create a simple usage tutorial for some of the basic usages of tasks and orchestrators to README? We already have 4 examples but I think a walkthrough tutorial could help users to develop new pipelines.

Check if file is empty

While running a task watermill should check if file is empty before executing it. One simple way to do so would be to check if file size is 0.

json schema validator on input, output, action stream

Would be super simple, essentially wrap a module into (x, f) => f(x) where f(x) is true or f(x) is false. x can then be something like

require('is-my-schema-valid')({
  required: true,
  type: 'object',
  properties: {
    rna-fastq: {
      required: true,
      type: RegExp
      pattern: new RegExp(`^${speciesName}.${run}.fastq.gz`)
    }
  }
})

However just realized perhaps is-my-schema-valid cannot compare RegExp objects as they are not by default in JSON.stringify(). @mafintosh, how difficult to extend the plugin, perhaps similar like replacer and reviver functions to faciltate comparisons of objects?

Taking a hint from how webpack lets you choose loaders based on regexing (somewhat implicit understood) filetypes, could apply arrows of validators to specific files, or subtrees of resolved output objects.

Task Architecture

A Task is the basic building block of pipelines in Waterwheel. It needs to be able to handle streams, synchronous, and asynchronous functions. There needs to be an standard way to declare a Task is over. As well, Tasks need to be joined into new Tasks, and parallelized into new Tasks. You should be able to join two Tasks that stream one into another without additional config.

Tasks can be:

  • BlockingTask:
    • input: anything - Array or Object of primitive, File, Stream
    • output: will not provide a Readable Stream, instead ending with or without a return value
  • *StreamingTask:
    • input: anything
    • output: Readable Stream
  • AsyncTask:
    • input: anything
    • output: Promise preferred, or callback

A File is just a class that hold a value and is used (at the moment) in the task action creator for an input instanceof File check to then determine if the file pattern (e.g. **/*.sra) should be resolved with globby. This is so that the task stream/promise creator can use a resolved input variable with the actual filename.

We will refer to the joining:

  • join(task1, task2, taskn) -> Task
    and parallelization:
  • parallel(task1, [task2, task3], [task4, task5] -> Task
    and forking:
  • consider this pipeline:
    new doc 6_1
    The seqtk merge task can be forked and provided to two other tasks: filter kmc and filter khmer. This can be done by defining a filter task with an array of task action creators. Then, since the kmc and khmer variants produce the same output, just via a different tool, the pipeline will automatically duplicate the bwa mem | samtools view | samtools sort; samtools index; samtools mpileup | bcftools call section of the pipeline for each of them.

as the orchestration of Tasks.

One way to enable the orchestration of Tasks is with callbacks. The task creator takes an object defining input and output, and then a function describing the task action creator. This returns a function of next. The join method and parallel method programmatically assign a function to the next to achieve their goals.

Another way is with Promises. Perhaps more elegant than callbacks, can reject when things go bad.

Another way to do this can be through events. The Task function can thrown an event whence the return Object from the task creator has completed. This has the advantage that it helps define a standard way to declare tasks are over. However, perhaps it can become messy listening for the same event when doing joins and parallels. This can be superseded by emitting a taskFinish event with some data that perhaps has a task uuid.

Custom file validator over output object

This means to be able to pass a custom file validator on a per task basis, something like:

const mytask = task({
  input: '*.sra',
  output: '*.vcf'
  validators: (file) => vcf(file).hasSNPs(25) // reject if < 25 SNPs with imaginary API
  operation: ({ input, context }) => `sometool ${input} -o ${context.sample}`
})

It will also need to handle multiple output types and how to set up validators for each one.

(Also I just made up that context thing - but maybe some way to pass current sample through tasks in a multiple input pipeline? in any way, it is always a little awkward hard coding output filenames in tasks - could have done sample.vcf for example)

Common Workflow Language (CWL) support?

Hello! This is a very interesting project. In case you haven't seen it, there's a project called Common Workflow Language (CWL) that attempts to create a single document specifying a pipeline workflow that can be parsed by a multitude of programs so your pipeline can be run portably on the cloud, a laptop, a server, etc.

Wanted to let you know about other people in the reproducibility space :)

Async vs Sync Task creation

  • duplexify with async globby and set read/write stream later - didnt convert globby to promise yet, but tested async with setTimeout

Failed all test cases when I switched task from being a sync function to a co.wrap(function * (){ .. }) - which returns a promise, but is nice because yielding promises is nice.

Before:

const task = Task(...)
isStream(task) // true

after:

const task = Task(...)
task.then((stream) => isStream(stream) /* true */ )

The reason it became async is for globby - I am checking the filesystem to see which files exist that match patterns in input and output. This may become negligible as tasks become able to pass values to each other. However, perhaps the performance impact of globby.sync will not be negligible when running many tasks, or tasks that have many files.

For now, I am just going to use globby.sync. As well, glob-stream may be worth a try - will still be async but perhaps glob-stream can be piped into the main Task stream.

At some point, Task may need to have a stream that wraps a stream. So that you can stream in input object into, and stream out an output info object on a .end after the .end/.push(null) of the inner stream.

Resolve input from all tasks in join

mpileupandcall needs the reference and bam, but as the per the join lineage:

const call = join(alignAndSort, samtoolsIndex, mpileupandcall)

it will only receive whichever samtoolsIndex provides as resolved output, which will just be the bai.
alignAndSort produces the bam, and receives the reference.

Current fix is to add

skipPassed: true

to the task so that it resolves input glob patterns from fs and not the output of the previous task.

So perhaps store all input and outputs of all tasks in a join lineage, and resolve inputs for tasks later on.

Nextflow does this by declaring "channels", pushing files into them, and pulling files out of them, explicitly, in separate tasks. Problem with that was need to manually duplicate channels to use the same file in other downstream tasks.

transform task

write a test for a simple task that uses a through2 transform stream (to convert lowercase to uppercase for example)

later perhaps same test can be used to check if in-between task streaming works

Timeline

A tentative plan for the way forward. I think once the core API is stable, time would be best spent reimplementing real world workflows that can be most improved with Streams. The primary concerns are

  • task orchestration
  • integration with SGE

After that work can begin on DSL, Docker, admin panel app, nbind etc.

If you think there is not enough time partitioned to plans as their should be, or if some should be swapped, triaged for others, etc, don't hesitate to let me know. I'd like us all to agree upon realistic plans for these next 8 weeks that are exciting and fully satisfy the original overarching goal.

Week 5

  • formalize task orchestration API
    • blocking vs streaming vs async see #1
    • join
    • parallel
    • forking
    • transform stream as task
  • tests on core API with basic examples, CI, and coverage
  • pipe tasks through each other instead of one task with shellPipe taking an array

Summary

  • refactored original prototype
    • task object now a stream that can wrap streams/non-streams
    • This makes task easier to use with existing stream modules
  • chatted with Matthias and Max about API, internal code
    • happy with API design so far, keep it going forward
    • change new File to vannila JS obect {file: 'foo'} - easier to new devs, no peer dependencies - ab782f
      • ability for other devs to work with, over shorter/cleaner syntax of "new" from consumer perspective - DSL will be clean
    • dont emit custom events after "end" of "finish" of stream, use those instead, and leave an object in the stream object "to be retrieved" (rather than emitting data with custom event)
  • chatted with Davide
    • does big social data analytics type stuff
    • interested in using waterwheel, contributing
    • walked him through clone and install

For this week, continue improving waterwheel, examples with real world workflows. We already have a basic genomic one, I'd like to try out an RNASeq pipeline, or whatever you guys suggest. - stick to improving genomic workflow with sound reentrancy

Task orchestration core codebase largely resolved, parallel/forking/join becomes easy when task returns a regularly compliant stream (i.e. no more custom events). Forking not done yet, but because task is now a steam, can be done with existing modules - e.g. multi-write-stream.

Week 6

  • Unit tests for task orchestration #18
    • check resolution scenarios (from previous task vs fs)
    • simple joins
    • simple parallels
    • joins and parallels should return valid task streams -> further composable
    • simple forking
    • transform task
    • stream search results, filter down to IDs, run pipeline on each ID
  • file validation #22
    • existence
    • not null check enabled by default
    • pass in custom validator function(s)
  • reentrancy #27
    • file timestamp or checksum
    • force rerun of all/specific task
    • --resume option
    • tee stream to file

Pushed down:

  • integrate with clustering systems like SGE - try to solve this bionode-sra issue
  • run tasks in their own folders
  • implement new real world workflows from papers

Week summary:

  • more unit tests on task orchestration, still need more for complex scenarios
  • basic reentrancy using existence of file, non null file, custom validator on file
  • pass in custom validators as an array, functions that take file path, return true/false
  • working simple variant calling example
  • came across, how to let user provide validators that take more than one file (e.g. reads_1 and reads_2)
  • came across this problem #35
Week 7
  • play with "pass files from other tasks" problem
  • integrate with Docker - specify a container for each Task
  • [ ] formalize YAML/hackfile based DSL

Week Summary:

  • this week didn't feel very productive, but
  • took time to think about how to restructure codebase with consideration of the "pass files from other tasks" problem
  • set up a gitbook, partially documenting the restructure approach: https://thejmazz.gitbooks.io/bionode-waterwheel/content/
  • tasks have a hash for params, input, and output, and series of tasks are arranged hierarchically using these
  • the pipeline at any point in time is a very stateful entity - the config and tasks of the pipeline are now managed in a redux store, this lets me describe every change to a live task (e.g. resolving output, running output validators, finding matching input from previous tasks) with an action, the action results in a reducer being called that returns a new state - reducers are small and more easily testable since they are pure functions. the giant codebase of task is now gradually moving into many smaller reducers which a smaller scope

Week 8

  • implement new real world workflows from papers

Week summary:

  • refactor fully completed - pipeline state managed by redux, actions are dispatched for each step in task lifecycle --> bug reports can be submitted with a snapshot of the exact state
    • big functions --> small, testable, pure functions
  • improved simple vcf example to be updated to refactored codebase
  • began implementation of "hierarchical output dump"
    • each task has a "trajectory" which is an array of keys of the output dump
    • task will match input patterns to absolute paths in the output dump, going through each "trajection" in the trajectory
    • keys of the DAG of the output dump are JSON.stringify(params) of each task
    • works somewhat but needs improvement (WIP)

Week 9

  • implement new real world workflows from papers

Week 10

  • implement new real world workflows from papers

Week 11

  • Project website
  • Complete documentation, examples, use cases, etc

Week 12

  • Final cleanup of website, docs, testing, examples

Extras/Pushed out

  • prototype a simple pipeline with nbind - an in browser functional Waterwheel pipeline will be a great way to introduce and teach the module
  • web/electron admin panel app - view tasks, edit tasks, see progress, see logs in realtime

Generalize traverse for resolving functions

Catch Operation Finish

Consider this task:

const switchFileType = (str, type) => {
  // probably nicer with a regex replace..
  const pieces = str.split('.')
  pieces.pop()
  return pieces.join('.') + '.' + type
}

const throughUppercase = through(function (chunk, enc, cb) {
  cb(null, chunk.toString().toUpperCase())
})

const uppercaser = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Uppercase Transform'
}, ({ input }) => 
  fs.createReadStream(input)
    .pipe(throughUppercase)
    .pipe(fs.createWriteStream(switchFileType(input, 'uppercase')))
)

The operation for this task will return a writable stream. Then dup.setWritable(operation) will apply it to the task duplexify stream. Similarly for readable.

Then in the catchTask action, the purpose is to resolve once the operation has completed. It's called "catch" because it stops the task lifecycle action pipeline until the operation is completed, so that then resolve output and validate output can be ran.

After output is successfully validated (file is not empty, etc.), I set an _output object in the duplex stream, and destroy the stream:

function finish(uid) {
  stream._output = getTask(uid).resolvedOutput
  stream.destroy()
  // TODO not this
  // 'destroy' may as well be 'breaksCompatibleWithStreamsModules'
  stream.emit('destroy')
}

On destroy() the duplex stream emits a close. Then the lifecycle of a task is something like:

uppercase()
  .on('destroy', function() {
     console.log(this._output) // object with absolute path of output
  }

and join will use that to run tasks after each other, collect all the outputs into the "output dump", etc.

However, this breaks if the operator itself throws a close event, as what happens with child processes. My solution to this was (which only switched to the emit('destroy') hack recently as I was hacking features into refactored code) was to add a function to the stream object, which could access things through a closure, and would run the output resolution and validations inside that function. On any "ending type" event - end, close, finish, you could call it. So the lifecycle for a task was:

uppercase()
  .on('close', function() {
     const output = this._output() // runs output resolution and validation now, but "outside" the task
     console.log(output)
  }

Which is better because it uses no made up events, but a little annoying because if you just do

myTask()

if will not actually run the whole task lifecycle until this._output() is called. Perhaps a taskWrapper could auto call this, and then itself emit a close.

Ideally:

someTask() // whole lifecycle contained inside here
  .on('close', // or some otherwise standard event, end, finish, etc
    function(this) {
      console.log(this._output) // prepared resolved+validated output
   })

This way task lifecycle is contained in one place without doing weird wrappers that would themselves need to emit close, and the resolved output is available immediately when a task is "done".

I saw mention of a flush function, but perhaps only for a writable stream?

Also, operations which are actually just duplex streams, like a transform, can be set as the duplex inside task and forking/parallel should work as if task is a regular duplex; so the "catch finish" need not apply since no output files need to resolved to absolute paths.

Passing output from last task into new task

  • use property on task stream instance instead of data on custom event

For duplex streams, this is obvious.

But for tasks that create files:

const task = Task({
   input: 'foo'
   output: new File('*.txt')
}, ({ input, output }) => shell(`echo ${input} > log.txt`))

We wait for the task to finish:
inside Task

stream.on('end' /* or finish */, () => {
   // resolve output glob pattern and emit it
   const resolvedOutput = globby.sync(output.value)
   stream.emit('task.done', resolvedOutput)
})

Then it is Join's job to pass these on:

function join(...tasks) {
  const doTaskAndTryNext = (i, tasks) => { 
    const task = tasks[i]

    task()
      .on('task.done', (output) => {
        console.log('Task finished with output:')
        console.log(tab(1) + output)

        if (i+1 !== tasks.length) {
          doTaskAndTryNext(i+1, tasks)
        }
      })  
  }


  doTaskAndTryNext(0, tasks)
}

But nothing is being passed in yet.

Idea 1

Make Task always be a duplex object stream. It's first and final chunks will be objects pertaining to input and output. Then we need to wrap the actual stream of the task, for example, a Readable whose this._source is just another stream. This may complicated doing duplex streams piping together.

Idea 2

Wow. This feels so obvious now. Pass items in the first param of Task. (which right now is just () => { ... }. I'll say my third idea anyways lol.

Idea 3

Have a task.made event emitted that emits a callback which can then pass in resolved outputs which become resolved inputs in the new task.

It is important to handle this case well, because with these pipelines, not everything is streamable. For example, doing an alignment requires an indexing on the reference which needs to run fully. But I think having Task always be a stream (i.e. it wraps promises into streams) makes it easier to then orchestrate things. And having an output as a File tells the orchestrator that this Task cannot be piped into another - unless we go the route where Task is always a duplex stream that wraps over other streams.

Types

value

Pass something in directly, as it is, with no resolution. Could be a boolean, string, number, Array, Function, Promise, ...

{ value: 'foo' }

file

a glob expression/regex to be resolved to a full path before/after task

{ file: '**/*.sra' }

stream

Use this if the task should take a stream as input, output, or both.

task 1

{ input: { value: '2492428' }, output: { stream: 'stdout' } }

task 2

{ input: { stream: 'stdin' }, output: { file: '*_genomic.fna.gz' } }

This will take the output stream of the preceding task and pipe it into the input of this task.

It is also possible to stream out an existing (or as it is created) file:

{ input: { 'stream-file': '*.sam' }, output: { stream: 'stdout' } }

Integrate nodestream

nodestream - Storage-agnostic streaming library for binary data transfers

This is beneficial because it can move the "tee to a file" work off of us perhaps, but more importantly, agnostically provides transfer to various cloud services. For now, can just use it for local filesystem.

Also need to consider how tasks produce output. If it is a program that takes an outputFile as params for example, do we need to create a readstream on that file as it is created to produce an outgoing stream of it?

Not sure if enhancement or feature, it is a bit of both - not entirely necessary for MVP - but very useful to have.

Pipeline Graph (DAG) Visualization

It is useful to have a visual representation of the Directed Acyclic Graph (DAG) that is produced during the execution of a pipeline.

In the graph,

  • each node is a task
  • a node may use outputs from n parent nodes as input(s). each input value will be resolved from one node. this information is not currently stored. it could be done as an alternative edge type (or perhaps use discrete edge weightings for different edge types)
  • in these visualizations, think of three vertical | as the child node of two parent nodes. TODO actual graph diagrams
  • join(A, B) creates the DAG a ---> b
  • junction(A, B) creates the DAG
a ---|
     |--->
b ---|
  • join(junction(A, B), C) creates the DAG
a ---|
     |---> c
b ---|
  • join(A, fork(X, Y), C) creates the DAG
      |---> x ---> c'
a --->|
      |---> y ---> c''

The redux reducer for the DAG is here. It uses graph.js.

The graph exists in the store under the path collection (i.e. a valid selector would be (state) => state.collection.

A function jsonifyGraph is also exported. This is because the graph object from graph.js is not serializable. This creates a serializable JSON representation of the graph.

See here how the collection (aka DAG) is logged out during task resolution for debug.

A first implementation of this could be to write the JSON graph to disk during the pipeline execution, overwriting the previous file whenever a ADD_OUTPUT or ADD_JUNCTION_VERTEX actions have been dispatched (i.e. whenever the state of the DAG changes). This way if a task fails, at least we have the last best graph stored.

Then it is a matter of parsing that JSON into a visualization using something like d3.

Suggestions to improve the way the graph is handled within watermill are welcome. Perhaps there is a better serializable format to use (e.g. graphml format).

BONUS

  • do it in a realtime with Electron/Browser app listening to changes in the redux store. new nodes should be added as the tasks run.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.