Consider this task:
const switchFileType = (str, type) => {
// probably nicer with a regex replace..
const pieces = str.split('.')
pieces.pop()
return pieces.join('.') + '.' + type
}
const throughUppercase = through(function (chunk, enc, cb) {
cb(null, chunk.toString().toUpperCase())
})
const uppercaser = task({
input: '*.lowercase',
output: '*.uppercase',
name: 'Uppercase Transform'
}, ({ input }) =>
fs.createReadStream(input)
.pipe(throughUppercase)
.pipe(fs.createWriteStream(switchFileType(input, 'uppercase')))
)
The operation
for this task will return a writable stream. Then dup.setWritable(operation)
will apply it to the task duplexify stream. Similarly for readable.
Then in the catchTask action, the purpose is to resolve once the operation has completed. It's called "catch" because it stops the task lifecycle action pipeline until the operation is completed, so that then resolve output and validate output can be ran.
After output is successfully validated (file is not empty, etc.), I set an _output
object in the duplex stream, and destroy the stream:
function finish(uid) {
stream._output = getTask(uid).resolvedOutput
stream.destroy()
// TODO not this
// 'destroy' may as well be 'breaksCompatibleWithStreamsModules'
stream.emit('destroy')
}
On destroy()
the duplex stream emits a close
. Then the lifecycle of a task is something like:
uppercase()
.on('destroy', function() {
console.log(this._output) // object with absolute path of output
}
and join
will use that to run tasks after each other, collect all the outputs into the "output dump", etc.
However, this breaks if the operator
itself throws a close
event, as what happens with child processes. My solution to this was (which only switched to the emit('destroy')
hack recently as I was hacking features into refactored code) was to add a function to the stream object, which could access things through a closure, and would run the output resolution and validations inside that function. On any "ending type" event - end, close, finish, you could call it. So the lifecycle for a task was:
uppercase()
.on('close', function() {
const output = this._output() // runs output resolution and validation now, but "outside" the task
console.log(output)
}
Which is better because it uses no made up events, but a little annoying because if you just do
if will not actually run the whole task lifecycle until this._output()
is called. Perhaps a taskWrapper
could auto call this, and then itself emit a close
.
Ideally:
someTask() // whole lifecycle contained inside here
.on('close', // or some otherwise standard event, end, finish, etc
function(this) {
console.log(this._output) // prepared resolved+validated output
})
This way task lifecycle is contained in one place without doing weird wrappers that would themselves need to emit close
, and the resolved output is available immediately when a task is "done".
I saw mention of a flush function, but perhaps only for a writable stream?
Also, operations
which are actually just duplex streams, like a transform, can be set as the duplex inside task
and forking/parallel should work as if task
is a regular duplex
; so the "catch finish" need not apply since no output files need to resolved to absolute paths.