Giter Site home page Giter Site logo

cwait's Introduction

cwait

build status npm monthly downloads npm version

cwait provides a queue handler (TaskQueue) and a wrapper (Task) for promises, to limit how many are being resolved simultaneously. It can wrap any ES6-compatible promises. This allows for example limiting simultaneous downloads with minor changes to existing code. Just wrap your existing "download finished" promise and use it as before.

This is a tiny library with a single dependency, usable both in browsers and Node.js.

Usage

Create a new TaskQueue passing it whatever Promise constructor you're using (ES6, Bluebird, some other shim...) and the maximum number of promise-returning functions to run concurrently. Then just call queue.wrap(<function>) instead of <function> to limit simultaneous execution.

Simple Node.js example:

import * as Promise from 'bluebird';
import {TaskQueue} from 'cwait';

/** Queue allowing 3 concurrent function calls. */
var queue = new TaskQueue(Promise, 3);

Promise.map(list, download); // Download all listed files simultaneously.

Promise.map(list, queue.wrap(download)); // Download 3 files at a time.

See test/test.ts for some runnable code or run it like this:

git clone https://github.com/charto/cwait.git
cd cwait
npm install
npm test

Recursion

Recursive loops that run in parallel require special care. Nested concurrency-limited calls (that are not tail-recursive) must be wrapped in queue.unblock().

Here's a simple example that fails:

var queue = new (require('cwait').TaskQueue)(Promise, 3);

var rec = queue.wrap(function(n) {
    console.log(n);
    return(n && Promise.resolve(rec(n - 1)));
});

rec(10);

It only prints numbers 10, 9 and 8. More calls don't get scheduled because there are already 3 promises pending. For example Node.js exits immediately afterwards because the program is not blocked waiting for any system calls.

Passing a promise to queue.unblock(promise) tells queue that the current function will wait for promise to resolve before continuing. One additional concurrent function is then allowed until the promise resolves.

Be careful not to call queue.unblock() more than once (concurrently) from inside a wrapped function! Otherwise the queue may permit more simultaneous tasks than the intended limit.

Here is a corrected example:

var queue = new (require('cwait').TaskQueue)(Promise, 3);

var rec = queue.wrap(function(n) {
    console.log(n);
    return(n && queue.unblock(Promise.resolve(rec(n - 1))));
});

rec(10);

Advanced example with recursion

The following code recursively calculates the 10th Fibonacci number (55) running 3 recursive steps in parallel, each with an artificial 10-millisecond delay.

At the end, it prints the result (55) and the number of concurrent calls (3).

var queue = new (require('cwait').TaskQueue)(Promise, 3);

var maxRunning = 0;
var running = 0;
var delay = 10;

var fib = queue.wrap(function(n) {
    // "Calculation" begins. Track maximum concurrent executions.
    if(++running > maxRunning) maxRunning = running;

    return(new Promise(function(resolve, reject) {
        setTimeout(function() {
            // "Calculation" ends.
            --running;

            // Each Fibonacci number is the sum of the previous two, except
            // the first ones are 0, 1 (starting from the 0th number).
            // Calculate them in parallel and unblock the queue until ready.

            resolve(n < 2 ? n :
                queue.unblock(Promise.all([
                    fib(n - 1),
                    fib(n - 2)
                ])).then(function(r) {
                    // Sum results from parallel recursion.
                    return(r[0] + r[1]);
                })
            );
        }, delay);
    }));
});

fib(10).then(function(x) {
    console.log('Result: ' + x);
    console.log('Concurrency: ' + maxRunning);
});

API

Docs generated using docts

Class Task

Task wraps a promise, delaying it until some resource gets less busy.
Source code: <>

Methods:

new( ) Task<PromiseType> <>
 ▪ func () => PromiseType
 ▪ Promise PromisyClass<PromiseType>
.delay( ) PromiseType <>
Wrap task result in a new promise so it can be resolved later.
.resume( ) PromiseType <>
Start the task and call onFinish when done.
 ▪ onFinish () => void

Class TaskQueue

Source code: <>

Methods:

new( ) TaskQueue<PromiseType> <>
 ▪ Promise PromisyClass<PromiseType>
 ▪ concurrency number
.add( ) PromiseType <>
Add a new task to the queue.
It will start when the number of other concurrent tasks is low enough.
 ▪ func () => PromiseType
.unblock( ) PromiseType <>
Consider current function idle until promise resolves.
Useful for making recursive calls.
 ▪ promise PromiseType
.wrap( ) (...args: any[]) => PromiseType <>
Wrap a function returning a promise, so that before running
it waits until concurrent invocations are below this queue's limit.
 ▪ func (...args: any[]) => PromiseType
 ▫ thisObject? any

Properties:

.concurrency number
Number of promises allowed to resolve concurrently.

License

The MIT License

Copyright (c) 2015-2017 BusFaster Ltd

cwait's People

Contributors

jjrv avatar sheamunion avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

cwait's Issues

Change concurrency level after initialization

Currently it is possible to decrease the concurrency of the queue, and it behaves as expected.
Increasing it however does not work, because each time a task finishes , only one new task gets executed instead of one task per available slot ( concurrency-busyCount )

This feature would allow to dynamically change processing speed, and even pause the jobs for a period of time.

Queue doesn't process in order?

I wrote this simple program to test cwait:

var cwait = require('cwait');

function asyncLog(x,y){
	return(new Promise(function(resolve, reject) {
		setTimeout(function(){
			console.log(x,y);
			resolve();
		},100)
	}));
}

var queue = new cwait.TaskQueue(Promise, 1);

[1,2,3,4,5].forEach((el) => {
	console.log(el);
	queue.wrap(asyncLog)("q:",el);
})

I expected it to output :

1
2
3
4
5
q: 1
q: 2
q: 3
q: 4
q: 5

But instead it outputs:

1
2
3
4
5
q: 1
q: 2
q: 5
q: 4
q: 3

EDIT: After testing repeatedly, it does output the array in order sometimes. Now I am even more confused.

Usage with vanilla Promise.all

I have the following code:

        let emails = results.map((options, idx) => {
          Object.assign(options, {
            to: `${users[idx].name} <${users[idx].email}>`
          }, mailOpts);

          return new Promise((resolve, reject) => {
            transporter.sendMail(options, function (error, info) {
              if (error) {
                console.log('error', error);
                reject(error);
              } else {
                console.log('Message sent: ' + info.response);
                resolve();
              }
            });
          })
        });

        return Promise.all(emails);

by the examples it isn't very clear where I should add the wrapper, can you give me some guidance please?

My intention is to limit the send of 3 or 4 emails at a time

Better typing for TaskQueue.wrap()

This is the current type of TaskQueue.wrap():
wrap(func: (...args: any[]) => any, thisObject?: any): (...args: any[]) => any;

Don't we know that func will be returning a promise? And don't we know that wrap will also return a promise?

Add task to empty queue

Hi,

It's possible to add a task to the queue after it's been wrapped?

In case it's what happens if I add a task to a queue that's already been completely processed? It will restart processing the newly added task?

Does not appear to work with vanilla Promises

The use of Promise#finally() internally does not work with vanilla Promises in node. (unless I'm just missing something)

I just get the following error:

TypeError: func(...).finally is not a function

(this was on the latest node 5.10.0)

Using with vanilla Promises

I was wondering if there was an example on how to use this with the standard ES6 promises since they don't have a Promise.map function?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.