Giter Site home page Giter Site logo

better-queue's Introduction

Better Queue - Powerful flow control

npm package

Build status Dependency Status Known Vulnerabilities Gitter

Super simple to use

Better Queue is designed to be simple to set up but still let you do complex things.

  • Persistent (and extendable) storage
  • Batched processing
  • Prioritize tasks
  • Merge/filter tasks
  • Progress events (with ETA!)
  • Fine-tuned timing controls
  • Retry on fail
  • Concurrent batch processing
  • Task statistics (average completion time, failure rate and peak queue size)
  • ... and more!

Install (via npm)

npm install --save better-queue

Quick Example

var Queue = require('better-queue');

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
})

q.push(1)
q.push({ x: 1 })

Table of contents


You will be able to combine any (and all) of these options for your queue!

Queuing

It's very easy to push tasks into the queue.

var q = new Queue(fn);
q.push(1);
q.push({ x: 1, y: 2 });
q.push("hello");

You can also include a callback as a second parameter to the push function, which would be called when that task is done. For example:

var q = new Queue(fn);
q.push(1, function (err, result) {
  // Results from the task!
});

You can also listen to events on the results of the push call.

var q = new Queue(fn);
q.push(1)
  .on('finish', function (result) {
    // Task succeeded with {result}!
  })
  .on('failed', function (err) {
    // Task failed!
  })

Alternatively, you can subscribe to the queue's events.

var q = new Queue(fn);
q.on('task_finish', function (taskId, result, stats) {
  // taskId = 1, result: 3, stats = { elapsed: <time taken> }
  // taskId = 2, result: 5, stats = { elapsed: <time taken> }
})
q.on('task_failed', function (taskId, err, stats) {
  // Handle error, stats = { elapsed: <time taken> }
})
q.on('empty', function (){})
q.on('drain', function (){})
q.push({ id: 1, a: 1, b: 2 });
q.push({ id: 2, a: 2, b: 3 });

empty event fires when all of the tasks have been pulled off of the queue (there may still be tasks running!)

drain event fires when there are no more tasks on the queue and when no more tasks are running.

You can control how many tasks process at the same time.

var q = new Queue(fn, { concurrent: 3 })

Now the queue will allow 3 tasks running at the same time. (By default, we handle tasks one at a time.)

You can also turn the queue into a stack by turning on filo.

var q = new Queue(fn, { filo: true })

Now items you push on will be handled first.

back to top


Task Management

Task ID

Tasks can be given an ID to help identify and track it as it goes through the queue.

By default, we look for task.id to see if it's a string property, otherwise we generate a random ID for the task.

You can pass in an id property to options to change this behaviour. Here are some examples of how:

var q = new Queue(fn, {
  id: 'id',   // Default: task's `id` property
  id: 'name', // task's `name` property
  id: function (task, cb) {
    // Compute the ID
    cb(null, 'computed_id');
  }
})

One thing you can do with Task ID is merge tasks:

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
}, {
  merge: function (oldTask, newTask, cb) {
    oldTask.count += newTask.count;
    cb(null, oldTask);
  }
})
counter.push({ id: 'apple', count: 2 });
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 1 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

By default, if tasks have the same ID they replace the previous task.

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
})
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'apple', count: 3 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 2 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

You can also use the task ID when subscribing to events from Queue.

var counter = new Queue(fn)
counter.on('task_finish', function (taskId, result) {
  // taskId will be 'jim' or 'bob'
})
counter.push({ id: 'jim', count: 2 });
counter.push({ id: 'bob', count: 1 });

Batch Processing

Your processing function can also be modified to handle multiple tasks at the same time. For example:

var ages = new Queue(function (batch, cb) {
  // Batch 1:
  //   [ { id: 'steve', age: 21 },
  //     { id: 'john', age: 34 },
  //     { id: 'joe', age: 18 } ]
  // Batch 2:
  //   [ { id: 'mary', age: 23 } ]
  cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });

Note how the queue will only handle at most 3 items at a time.

Below is another example of a batched call with numbers.

var ages = new Queue(function (batch, cb) {
  // batch = [1,2,3]
  cb();
}, { batchSize: 3 })
ages.push(1);
ages.push(2);
ages.push(3);

Filtering, Validation and Priority

You can also format (and filter) the input that arrives from a push before it gets processed by the queue by passing in a filter function.

var greeter = new Queue(function (name, cb) {
  console.log("Hello, %s!", name)
  cb();
}, {
  filter: function (input, cb) {
    if (input === 'Bob') {
      return cb('not_allowed');
    }
    return cb(null, input.toUpperCase())
  }
});
greeter.push('anna'); // Prints 'Hello, ANNA!'

This can be particularly useful if your queue needs to do some pre-processing, input validation, database lookup, etc. before you load it onto the queue.

You can also define a priority function to control which tasks get processed first.

var greeter = new Queue(function (name, cb) {
  console.log("Greetings, %s.", name);
  cb();
}, {
  priority: function (name, cb) {
    if (name === "Steve") return cb(null, 10);
    if (name === "Mary") return cb(null, 5);
    if (name === "Joe") return cb(null, 5);
    cb(null, 1);
  }
})
greeter.push("Steve");
greeter.push("John");
greeter.push("Joe");
greeter.push("Mary");

// Prints out:
//   Greetings, Steve.
//   Greetings, Joe.
//   Greetings, Mary.
//   Greetings, John.

If filo is set to true in the example above, then Joe and Mary would swap order.

back to top


Queue Management

Retry

You can set tasks to retry maxRetries times if they fail. By default, tasks will fail (and will not retry.) Optionally, you can set a retryDelay to wait a little while before retrying.

var q = new Queue(fn, { maxRetries: 10, retryDelay: 1000 })

Timing

You can configure the queue to have a maxTimeout.

var q = new Queue(function (name, cb) {
  someLongTask(function () {
    cb();
  })
}, { maxTimeout: 2000 })

After 2 seconds, the process will throw an error instead of waiting for the callback to finish.

You can also delay the queue before it starts its processing. This is the behaviour of a timed cargo.

var q = new Queue(function (batch, cb) {
  // Batch [1,2] will process after 2s.
  cb();
}, { batchSize: 5, batchDelay: 2000 })
q.push(1);
setTimeout(function () {
  q.push(2);
}, 1000)

You can also set afterProcessDelay, which will delay processing between tasks.

var q = new Queue(function (task, cb) {
  cb(); // Will wait 1 second before taking the next task
}, { afterProcessDelay: 1000 })
q.push(1);
q.push(2);

Instead of just the batchDelay, you can add a batchDelayTimeout, which is for firing off a batch if it hasn't had any new tasks pushed to the queue in the batchDelayTimeout time (in milliseconds.)

var q = new Queue(fn, {
  batchSize: 50,
  batchDelay: 5000,
  batchDelayTimeout: 1000
})
q.push(1);
q.push(2);

In the example above, the queue will wait for 50 items to fill up in 5s or process the queue if no new tasks were added in 1s.

Precondition

You can define a function called precondition that checks that it's ok to process the next batch. If the preconditions fail, it will keep calling this function until it passes again.

var q = new Queue(function (batch, cb) {

  // Do something that requires internet

}, {
  precondition: function (cb) {
    isOnline(function (err, ok) {
      if (ok) {
        cb(null, true);
      } else {
        cb(null, false);
      }
    })
  },
  preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s
})

Pause/Resume

There are options to control processes while they are running.

You can return an object in your processing function with the functions cancel, pause and resume. This will allow operations to pause, resume or cancel while it's running.

var uploader = new Queue(function (file, cb) {
  
  var worker = someLongProcess(file);

  return {
    cancel: function () {
      // Cancel the file upload
    },
    pause: function () {
      // Pause the file upload
    },
    resume: function () {
      // Resume the file upload
    }
  }
})
uploader.push('/path/to/file.pdf');
uploader.pause();
uploader.resume();

Cancel/Abort

You can also set cancelIfRunning to true. This will cancel a running task if a task with the same ID is pushed onto the queue.

var uploader = new Queue(function (file, cb) {
  var request = someLongProcess(file);
  return {
    cancel: function () {
      request.cancel();
    }
  }
}, {
  id: 'path',
  cancelIfRunning: true
})
uploader.push({ path: '/path/to/file.pdf' });
// ... Some time later
uploader.push({ path: '/path/to/file.pdf' });

In the example above, the first upload process is cancelled and the task is requeued.

You can also call .cancel(taskId) to cancel and unqueue the task.

uploader.cancel('/path/to/file.pdf');

Note that if you enable this option in batch mode, it will cancel the entire batch!

back to top


Advanced

Updating Task Status

The process function will be run in a context with progress, finishBatch and failedBatch functions.

The example below illustrates how you can use these:

var uploader = new Queue(function (file, cb) {
  this.failedBatch('some_error')
  this.finishBatch(result)
  this.progressBatch(bytesUploaded, totalBytes, "uploading")
});
uploader.on('task_finish', function (taskId, result) {
  // Handle finished result
})
uploader.on('task_failed', function (taskId, errorMessage) {
  // Handle error
})
uploader.on('task_progress', function (taskId, completed, total) {
  // Handle task progress
})

uploader.push('/some/file.jpg')
  .on('finish', function (result) {
    // Handle upload result
  })
  .on('failed', function (err) {
    // Handle error
  })
  .on('progress', function (progress) {
    // progress.eta - human readable string estimating time remaining
    // progress.pct - % complete (out of 100)
    // progress.complete - # completed so far
    // progress.total - # for completion
    // progress.message - status message
  })

Update Status in Batch mode (batchSize > 1)

You can also complete individual tasks in a batch by using failedTask and finishTask functions.

var uploader = new Queue(function (files, cb) {
  this.failedTask(0, 'some_error')         // files[0] has failed with 'some_error'
  this.finishTask(1, result)               // files[1] has finished with {result}
  this.progressTask(2, 30, 100, "copying") // files[2] is 30% done, currently copying
}, { batchSize: 3 });
uploader.push('/some/file1.jpg')
uploader.push('/some/file2.jpg')
uploader.push('/some/file3.jpg')

Note that if you use *-Task and *-Batch functions together, the batch functions will only apply to the tasks that have not yet finished/failed.

Queue Statistics

You can inspect the queue at any given time to see information about how many items are queued, average queue time in milliseconds, success rate and total item processed.

var q = new Queue(fn);
var stats = q.getStats();

// stats.total = Total tasks processed
// stats.average = Average process time in milliseconds
// stats.successRate = % success (between 0 and 1)
// stats.peak = Most tasks queued at any given point in time

back to top


Storage

Using a store

For your convenience, we have added compatibility for a few storage options.

By default, we are using an in-memory store that doesn't persist. You can change to one of our other built in stores by passing in the store option.

Built-in store

Currently, we support the following stores:

  • memory
  • sql (SQLite, PostgreSQL)

SQLite store (npm install sqlite3)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'sqlite',
    path: '/path/to/sqlite/file'
  }
});

Note that this requires better-queue-sql or better-queue-sqlite.

PostgreSQL store (npm install pg)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'postgres',
    host: 'localhost',
    port: 5432,
    username: 'username',
    password: 'password',
    dbname: 'template1',
    tableName: 'tasks'
  }
});

Please help us add support for more stores; contributions are welcome!

Custom Store

Writing your own store is very easy; you just need to implement a few functions then call queue.use(store) on your store.

var q = new Queue(fn, { store: myStore });

or

q.use(myStore);

Your store needs the following functions:

q.use({
  connect: function (cb) {
    // Connect to your db
  },
  getRunningTasks: function (cb) {
    // Returns a map of running tasks (lockId => taskIds)
  },
  getTask: function (taskId, cb) {
    // Retrieves a task
  },
  putTask: function (taskId, task, priority, cb) {
    // Save task with given priority
  },
  takeFirstN: function (n, cb) {
    // Removes the first N items (sorted by priority and age)
  },
  takeLastN: function (n, cb) {
    // Removes the last N items (sorted by priority and recency)
  }
})

back to top


Using with Webpack

Better Queue can be used in the browser using the default in-memory store. However you have to create and pass the store to its constructor.

import Queue = require('better-queue')
import MemoryStore = require('better-queue-memory')

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
},
{
    store: new MemoryStore(),
  }
)

TypeScript Support

Better Queue can be used in TypeScript projects by installing type definitions from the Definitely Typed repository:

npm install --save @types/better-queue

Afterwards, you can simply import the library:

import Queue = require('better-queue')

const q: Queue = new Queue(() => {});

back to top


Full Documentation

new Queue(process, options)

The first argument can be either the process function or the options object.

A process function is required, all other options are optional.

  • process - function to process tasks. Will be run with either one single task (if batchSize is 1) or as an array of at most batchSize items. The second argument will be a callback cb(error, result) that must be called regardless of success or failure.

  • filter - function to filter input. Will be run with input whatever was passed to q.push(). If you define this function, then you will be expected to call the callback cb(error, task). If an error is sent in the callback then the input is rejected.
  • merge - function to merge tasks with the same task ID. Will be run with oldTask, newTask and a callback cb(error, mergedTask). If you define this function then the callback is expected to be called.
  • priority - function to determine the priority of a task. Takes in a task and returns callback cb(error, priority).
  • precondition - function that runs a check before processing to ensure it can process the next batch. Takes a callback cb(error, passOrFail).

  • id - The property to use as the task ID. This can be a string or a function (for more complicated IDs). The function (task, cb) and must call the callback with cb(error, taskId).
  • cancelIfRunning - If true, when a task with the same ID is running, its worker will be cancelled. Defaults to false.
  • autoResume - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to true.
  • failTaskOnProcessException - If true, when the process function throws an error the batch fails. Defaults to true.
  • filo - If true, tasks will be completed in a first in, last out order. Defaults to false.
  • batchSize - The number of tasks (at most) that can be processed at once. Defaults to 1.
  • batchDelay - Number of milliseconds to delay before starting to popping items off the queue. Defaults to 0.
  • batchDelayTimeout - Number of milliseconds to wait for a new task to arrive before firing off the batch. Defaults to Infinity.
  • concurrent - Number of workers that can be running at any given time. Defaults to 1.
  • maxTimeout - Number of milliseconds before a task is considered timed out. Defaults to Infinity.
  • afterProcessDelay - Number of milliseconds to delay before processing the next batch of items. Defaults to 1.
  • maxRetries - Maximum number of attempts to retry on a failed task. Defaults to 0.
  • retryDelay - Number of milliseconds before retrying. Defaults to 0.
  • storeMaxRetries - Maximum number of attempts before giving up on the store. Defaults to Infinity.
  • storeRetryTimeout - Number of milliseconds to delay before trying to connect to the store again. Defaults to 1000.
  • preconditionRetryTimeout - Number of milliseconds to delay before checking the precondition function again. Defaults to 1000.
  • store - Represents the options for the initial store. Can be an object containing { type: storeType, ... options ... }, or the store instance itself.

Methods on Queue

  • push(task, cb) - Push a task onto the queue, with an optional callback when it completes. Returns a Ticket object.
  • pause() - Pauses the queue: tries to pause running tasks and prevents tasks from getting processed until resumed.
  • resume() - Resumes the queue and its runnign tasks.
  • destroy(cb) - Destroys the queue: closes the store, tries to clean up.
  • use(store) - Sets the queue to read from and write to the given store.
  • getStats() - Gets the aggregate stats for the queue. Returns an object with properties successRate, peak, total and average, representing the success rate on tasks, peak number of items queued, total number of items processed and average processing time in milliseconds, respectively.
  • resetStats() - Resets all of the aggregate stats.

Events on Queue

  • task_queued - When a task is queued
  • task_accepted - When a task is accepted
  • task_started - When a task begins processing
  • task_finish - When a task is completed
  • task_failed - When a task fails
  • task_progress - When a task progress changes
  • batch_finish - When a batch of tasks (or worker) completes
  • batch_failed - When a batch of tasks (or worker) fails
  • batch_progress - When a batch of tasks (or worker) updates its progress

Events on Ticket

  • accepted - When the corresponding task is accepted (has passed filter)
  • queued - When the corresponding task is queued (and saved into the store)
  • started - When the corresponding task is started
  • progress - When the corresponding task progress changes
  • finish - When the corresponding task completes
  • failed - When the corresponding task fails

better-queue's People

Contributors

bennycode avatar ddehghan avatar dominicrj23 avatar favio41 avatar jknielse avatar leanderlee avatar mananruck avatar marcbachmann avatar npow avatar psantos9 avatar timdaub avatar utopiah avatar yawhide avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

better-queue's Issues

In service function

Function to check if the queue is in service before pulling things off of the queue -- This would be useful for processes that require certain conditions to be met (internet, environment, other variables) before processing the queue.

It would need to be accompanied by a fixed interval retry to check when the service comes back online.

What about having access to the task id?

Description

I would need to have access to the task id on push or on queued if this is posible. So I can send it back to the user. I dont know if this is the best aproach...

My process goes as follows:

  1. Receive data from user
  2. Create data
  3. Push to queue
  4. On accepted I should respond to user with the id of the process for later use.
  5. User send me the id to retrieve the

My Aproach

Now I save them myself with a custom id, so if program restarts they are all lost, It would be really helpfull to be able to use the id from the task.

Question

Is it posible to get the task id either on queued or on accepted?

Catch max retries

It seems that no matter what my maxRetry value is, the batch_failed event is called on the first failure.

Is there an even to let me know when it reached the max retry?

Store does not seem to work

Hi, I have tried using the SQLite store, but the tasks do not seem to be persisted. When I push a new task to a queue, and the task is failing, I expect to find it in the database, at least between task executions, but it's not the case, the table in the db is always empty. I tried to understand what's going on and found out that the release lock function is being called after every run and in the store it's implemented to delete the task from the db. Is that correct? Why is the task not held in the database between executions to make it survive app restarts?

The behavior I expect is actually that a task remains in the db until it finishes or fails after max retries.

IMP: Length property doesn't decrement upon Cancellation of a task

When i call cancel API, it cancels the task, the Store Queue also gets decremented but Length property does not decrement.
uploader.cancel('/path/to/file.pdf');

I believe it is decremented only upon fetching next task. And since cancel does not trigger fecthing next task, Length property does not decrement.
Appreciate if this could be fixed.

Promise support

Hi,

Now that Node.js 8 has dropped with full async/await support, it would be awesome to get native support for promises in this module. I'm currently using it something like this:

const queue = new Queue((input, cb) => {
  doMyActualStuff(input).then(
    () => setImmediate(cb, null),
    err => setImmediate(cb, err)
  )
})

It would be awesome if it was okay to return a Promise directly from the function, and have it just workโ„ข

const queue = new Queue(async (input) => {
  // ...
})

This could be done in a few ways, some of which includes:

  1. Make a breaking change and only support promises (my personal favourite but might be a bit to disruptive)
  2. Have a secondary require require('better-queue/promise') that exports the promise-based version
  3. Detect if a promise is returned by the function, and/or detect the functions arity (e.g. mocha looks for both, and complains if the function arity is 2 but it also returns a Promise)

I'd be happy to submit a PR for any of the approaches, or any other alternative :)

Thanks ๐Ÿ‘‹

Types Definition

Feature Request
better-queue lacks a Types definition; either as part of this project or as part of DefinitelyTyped.

Even without types, better-queue works fine with Typescript. Types is simply a "nice to have."

Steps to Replicate
index.ts

import * as Queue from 'better-queue';

let myQueue : Queue = new Queue(fn);

tsc build error -> Could not find a declaration file for module 'better-queue' '/queue.js' implicitly has an 'any' type.

Work Around
Import with:
let Queue = require('better-queue');

webpack

I'd like to use better-queue through webpack, but it dynamically requires the memory store as a fallback. Would it add too much overhad to always require memory store statically, and just not use it when overridden?

Get current queue size

I am using the in-memory queue, and at some point I reach NodeJs' max heap.

To prevent this, I want to limit my queue to a maximum of X tasks waiting in queue, until it goes down, I don't push anything.

The problem is, I can't find any way to know what the current queue size it.

How to create partitioned queues

I have jobs origination from multiple user accounts landing in central better-queue, running with concurrency set to one, since the jobs must be run in order in which they are received, and only one at a time. This works, but is slow because I am forcing synchronous processing among all accounts, which is not really necessary. In reality I just need to make sure that each account's tasks are run synchronously and in the order of which they are received. So I'm trying to figure out the best way to adapt better-queue to this solution. Is there a config option that would create one worker for each 'account-id' or should I be creating a new/independent queue (maybe using classes) for each account, keeping an array of queues? Has anybody come up with a nice solution to this problem with better-queue?

Have concurrency take function?

We've been using better-queue in Gatsby with great success so thanks for the great design! One thing I'd like to add is the ability to change the concurrency of different queues as at different times.

Is this something you've considered? Would be interested in taking? I haven't looked at your code at all so not sure how involved a PR for this would be but if you're interested, I'd be happy to take it on.

concurrent problem, wrong order

Hi, I have problem with concurrent, if I have concurrent set on 8 (for example) and add many tasks the task are do in wrong order. the first 7 is do invert the rest is done normally. There is no easy explanation how it will behave when I push 100 task for example. Sometimes is just first 8 in wrong order sometimes is first 16 but than I got something like this 7 6 5 4 3 2 1 11 10 8 9 12 13 14 15 16. The filo option is set on false so first in first out.

need support for delayBatch

Your lib is wonderful, but I need you support to config to pass this senario - Delay sending time 1 minute :

  1. I create a queue that hold all events (time serial event) with FIFO queue.
  2. And will hold for 1 minutes in queue
  3. Continuous send back via call function callback.

I've read full your document and code but can not use your lib to define my case. Could you support me?

Sorry, my bad English

queue.push() doesn't check if task id is currently a worker

Just a clarification on my part. I was noticing that long running tasks aren't considered for duplicating checking on 'id' when pushed into a queue. For example, I could have a queue of concurrency 10, and push in a task that'll take 10 minutes to execute. It's id property is long_running_task. Once work begins on it, I can push another tasks, `` into the queue and it'll begin working, despite having the same id as the long running task in the worker. I would have thought that if there's a task with the same ID that's working, any new tasks with the same ID would be rejected/merged?

async/await

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

const q = new Queue(async (input, cb) => {
  console.log(input)
  await sleep(1000)
  cb()
}, {
  concurrent: 3
})

Now I need to call in async callback functions. Can you do an automatic wait for asynchronous functions? To remove that callback

better-queue not working with Electron version 2.0.6

Hey there, recently, I have upgrade my application to use Electron 2.0.6 and now an exception is thrown when I attempt to set the store in the Queue using sqlite. This only happens after the executable has been built into a Windows executable by electron-winstaller. The code snippet is:

this.queue = new Queue(this.queueProcessor, {
      store: {
        type: 'sqlite',
        path: 'path/to/file.db3',
      },
    });

The better-queue version I'm using is 3.8.10 and better-queue-sqlite is 1.0.5 and electron-winstaller is 2.6.4.
Not sure whether this is an issue here or in Electron so correct me if I'm asking in the wrong place.

Exiting with unfinished jobs

I wrote an app which parses a large xml file with around 17M items of interest. These are added to a queue and using batch processing, written to a web service. When parsing is complete, the app exits (status code 0) with about half of the items are left unwritten. Any idea what might be going on?

merge : Callback is twice called.

When i try to add a duplicate task and handle it with merge/default behaviour. It merges the task.
But, the callback handler of the duplicate task is called twice even though after merge there is only 1 task.

persistent queues aren't persistent

after it pulls it off the queue and program crashes.
problem is, the queue pulled the tasks off the queue before the tasks successfully finished, so now they are gone.

How the library works and is clustering supported?

Please update the README:

  • To outline how the library works.
  • To clarify if clustering is supported or not.

After reading the source code:

I guess clustering (running tasks on many machines) is not supported yet, because:

  • On startup, all running tasks are forcefully resumed. The code doesn't care that the tasks may be being taken by other machines.
  • When taking new tasks, the code only care about newly inserted tasks. It doesn't care about the tasks that have been marked as taken by other machines, but other machines may have died.

To support clustering with "at least once" semantic, these little modifications can be made ("little" compared to the existing great work):

Delay start of task

I'm looking to use better-queue to debounce incoming tasks. given a debounce period of 10 seconnds, an event stream like:

  1. t0: schedule export for item 0
  2. t5: schedule export for item 0
  3. t10: schedule export for item 1
  4. t20: schedule export for item 0

I want the export for item 0 to start at t15 (1. is cancelled because it's superseded by 2. within 10 seconds) and at t20, and the export for item 1 als at (or close to) t20.

Can this be done with better-queue? I am looking at using the item ids as task ids which would take care of superseding, but not the delayed start. I had planned to set concurrency to 1 -- exports are fairly expensive so I'd rather have only one running at a time.

Cannot update message on task progress

I fail to understand why you can update the progress message anytime at the batch level, and not at the task level (you can set it only once)

In other words, why this :
(worker.js:176)
self.progress.message = msg || '';

and this :
(worker.js:186)
self.progress.tasks[id].message = self.progress.tasks[id].message || msg;

Anyway, thanks for the tool. Loving it !

Task events not working

Hello,
I tried the following example:

var uploader = new Queue(function (file, cb) {
  this.failedBatch('some_error')
  this.finishBatch(result)
  this.progressBatch(bytesUploaded, totalBytes, "uploading")
});
uploader.on('task_finish', function (taskId, result) {
  // Handle finished result
})
uploader.on('task_failed', function (taskId, errorMessage) {
  // Handle error
})
uploader.on('task_progress', function (taskId, completed, total) {
  // Handle task progress
})
 
uploader.push('/some/file.jpg')
  .on('finish', function (result) {
    // Handle upload result
  })
  .on('failed', function (err) {
    // Handle error
  })
  .on('progress', function (progress) {
    // progress.eta - human readable string estimating time remaining
    // progress.pct - % complete (out of 100)
    // progress.complete - # completed so far
    // progress.total - # for completion
    // progress.message - status message
  })

All events attached to the task didn't work except progress event was triggered only one time, but events attached to the queue worked fine.

issue with concurrency and db store (sqlite)

Hello, been using better-queue in production for a long time, works great :)

I'm looking to add some persistence to the queue by way of sqlite. When running a test using:

batchsize: 1,
concurrent: 1,
store: {
    type: 'sql',
    dialect: 'sqlite',
    path: '/home/data.db'
},

The second multiple items are pushed to the queue in a loop, it always executes 2 items immediately. It should be executing one at a time since concurrency is set to 1. As soon as I remove the store options it works perfectly fine.

Is there a simple way to fix this?

Why send only the message attribute?

It is up to the application to decide how to deal with the error. But this gets tricky when you get only the error as a string.

Is that really correct?

Worker.js: 72

self.failedBatch(err.message || err);

Should be:

self.failedBatch(err);

Anyway, thank you. Great tool!

Clear tasks in queue

Is there a way to clear all tasks in a queue ? and also clear task by id to remove task from queue before being processed?

Document queue.length

When using better-queue, I needed to check if there are remaining items in the queue (couldn't rely on the drain event). I considered tracking pushed and finished/failed tasks, but I discovered the length property in the library's source. It works fine, however it is not documented.

I'd love to see the property nailed down in the readme, mainly to make it a breaking change when it'd be renamed or removed. (To be in line with the rest of the API, an accessor method like getRemainingCount() would actually be more fitting. However, the length property is already there. Also, even more importantly, it is a very common JavaScript convention.)

I would be happy to attach a PR if there are no objections against the idea?

Make queues faster

One idea is to change taskIds to use numbers by default (they are much faster in JS). Another is to figure out a way to remove the write queue...

error

16:43:15 post /api/auth/authorize? 200 2347.502 ms
16:43:15 info { status: 'cancelled' } (sync.js:213)
16:43:15 err! [console] 
  printErrorAndExit [console] (source-map-support.js:366)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:366:13)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
16:43:15 err! [console] /usr/local/lib/node_modules/better-queue/lib/queue.js:277
    var tickets = self._writing[taskId].tickets;
                                       ^
  printErrorAndExit [console] (source-map-support.js:367)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:367:13)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
16:43:15 err! [console] TypeError: Cannot read property 'tickets' of undefined
    at Statement.<anonymous> (/usr/local/lib/node_modules/better-queue/lib/queue.js:277:40)
  printErrorAndExit [console] (source-map-support.js:370)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:370:11)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
[nodemon] app crashed - waiting for file changes before starting...

Any Sample for worker pulling messages from queue?

Hi,

I am looking for in memory queue options in node as i want to avoid installing redis, rabbitmq for queue/worker mechanism.

Current sample shows processing of messages while initializing queue in a process method. Do we have any mechanism in better-queue where workers can pull messages from queue? If Yes, Please point to some example which can be referred.

Thanks

Safe to replace setImmediate with setTimeout?

I am trying to use a webpacked version of better-queue in a Firefox extension, and it looks like setImmediate doesn't work in Firefox. Is it safe to replace all calls to setImmediate with setTimeout? I've tried reading the docs on the JS event loop, but it's not clear to me what the potential downsides are of using setTimeout for better-queue.

Knex:warning - Pool2 - Error: ResourceRequest timed out

I've come accross this error, when adding a bunch of jobs, any idea why this could be happening?
I can attach code example if needed.

PD:
It fires event accepted and then fails.

Knex:warning - Pool2 - Error: ResourceRequest timed out
Unhandled rejection Error: Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?
    at C:...\node_modules\knex\lib\runner.js:191:30
    at tryCatcher (C:...\node_modules\bluebird\js\release\util.js:16:23)
    at C:...\bluebird\js\release\catch_filter.js:17:41
    at tryCatcher (C:...\node_modules\bluebird\js\release\util.js:16:23)

Update custom store documentation

I've been having problems with the sqlite store, and i'm trying to create a custom store as it's explained in the docs.

Create the store object:

// ./custom-store.js
module.exports = {
  tasks: {},
  connect(cb) {
    console.log('connect');
    this.tasks = fs.readJsonSync(storePath);
    let count = Object.getOwnPropertyNames(this.tasks).length;
    cb(null, count);
  },
  getTask(taskId, cb) {
    console.log('getTask', taskId);
    // Retrieves a task
  },
  putTask(taskId, task, priority, cb) {
    console.log('putTask');
    // Save task with given priority
  },
  takeFirstN(n, cb) {
    console.log('takeFirstN');
    // Removes the first N items (sorted by priority and age)
  },
  takeLastN(n, cb) {
    console.log('takeLastN');
    // Removes the last N items (sorted by priority and recency)
  }
}

Use Store:

let myStore = require('./custom-store');
let queue = new Queue(task, { store: myStore });

When running it throws:

C:...\node_modules\better-queue\lib\queue.js:188
    self._store.getRunningTasks(function (err, running) {
                ^
TypeError: self._store.getRunningTasks is not a function

If getRunningTasks is required I't would be nice to add it to the documentation, im up to doing it when I have some time.

Does it work with karma+jasmine ?

Hi,
I'm trying to do some asynchronous operations inside a task which resolves when I get the result and complete some manipulations. And its working properly, except when I'm trying to make test cases for it.
Here's a code example:

myservice.service.ts
public saveToAPI(url: string, id: string, payload: any): Promise < any > {
        return new Promise((resolve, reject) => {
            this.queue.addTask({
                id: `operation-${id}`,
                task: () => {
                    return this.http.post(url+'/save', payload).toPromise((res) => {
                       //do some operations here
                       resolve()
                    }).catch(err => {
                       reject(err)
                    })
                }
            });
        });
    }

myservice.spec.ts

fdescribe('saveToAPI', () => {
    fit('should save payload to API', (done) => {
        service.saveToAPI('https://myurl.com', 'uniqid', {name: 'John Doe'}).then(res => {
            //expect(res).toEqual(something)
            done();
        }).catch(err => {
            fail(err);
            done();
        });

        const request = httpMock.expectOne(
            'https://myurl.com/save'
        );
        request.flush({
            status: 200,
            data: {
                'message': 'Saved successfully'
            }
        });
    })
});

Errors in the console only mention that the httpMock failed to record any request made to the server, and it only happens for functions which use better-queue.

Any ideas ?

Missing unshift method

I need to add something to top of the queue without having to configure the Queue to filo from the start.

There should be a way to either do unshift or change the filo setting after the constructor.

Seems implementing unshift is pretty easy. I am happy to do that if you all need it as well.

Access task in queue by id

How can I access I trigger 'progressBatch', 'finishBatch' failedBatch' to a specific ticket id using queue ?
how can I access ticket by id ?

Precondition delay and retry failure.

The way i've read the documentation is that the precondition function will be called until it passes and there are objects in the queue. Also, precondition retry will be delayed by whatever the timeout is.

It looks as if every time an object is added to the queue, it calls the precondition function, ignoring the delay.

In addition to the delay being ignored, my code breaks the queue from calling the precondition infinitely and i'm having a hard time figuring out why.

I've setup a queue with the following code:

receiptQueue = new Queue(function (data, cb) {
        printService.print(data)
            .then(function () {
                cb(null, "ok");
            }).catch(function (err) {
            cb(err);
        });
    }, {
        afterProcessDelay: 500,
        maxRetries: 12, retryDelay: 5000,
        precondition: function (cb) {
            printService.isOnline()
                .then(function (result) {
                    if (result) {
                        console.log("Online");
                        cb(null, true);
                    } else {
                        console.log("Offline");
                        cb(null, false);
                    }
                })

        }, preconditionRetryTimeout: 1000
    }
);

I'm using the latest better-queue and node v8.

Ability to flush queue on demand

I am using better-queue to queue up client logs prior to sending them to a remove server in batches. However, when the user chooses to close my application, any queued logs are lost.

Is there any way to flush the queue on demand so that I can be sure all logs have been sent to the server prior to the application shutting down?

callbacks with 1 concurrent job

Is there an obvious reason why my queue doesn't wait for the callback to be executed before starting the next job?

screen shot 2017-12-21 at 3 17 57 pm

At the end of webhookUpdater(), I call done(), but as the next job is queued, it starts before that happens. Am I misunderstanding something? Also, is there a better place to ask questions like this?

Duplicate Check?

Hi there,
first of all great package, keep it up!
Sorry to open an issue, didn't know where to ask.

Is there a possibility to print out the current elements in the Q and/or to check if the element about to being pushed is already in the Q? Couldn't find something in the documentation.

Thank you very much!

Push/completion hooks (not sure what else to call them)

It'd be useful to have some kind of mechanism for running a function upon every push, and just before calling the callbacks for the completion of the associated items. Tracking the progress of items through a pipeline would be made easy by having queues that check in with a reporting system upon the addition and completion of items on the queue.

Is this library stable? Should I use RabbitMQ, etc instead?

Hi, I'm sorry for filing so many issues lately but I was using it for a work project and needed some feedback.

First of all, I really appreciate the work put into this project, and this is just constructive advice.

Let me explain

Here is the thing, I think it's a really great package that serves a purpose, but I think it's not stable enough to be considered stable nor be used in production or in big apps. At least not for now, mostly because of the stores, it's not 100% reliable, and it fails more than it works... Although memory store works pretty good (I can explain this in detail if this issue goes through).

Let me rephrase that, it is stable and reliable, although I had some incidents with custom stores (might have been my fault). My point is to notice that this library will not scale well, and will not work for decentralized apps or big scale apps.

What I mean is, take for example you want to be able to access the queue from 2 parts of the system (one writes, one reads), which are in different servers or containers, in this case, it would be kinda difficult to adapt this lib to that architecture.

If you are new or semi-new, please don't take this to seriously for now, it will not be a problem at all, this is for more advanced apps and systems. This lib will do the job well and with ease.

In the end, I switched to using RabbitMQ as it's more solid and stable for bigger applications and great for decentralized queues.

I'm willing to contribute to this package as I really think it has its purpose and could be very powerful... we could also get inspiration from RabbitMQ...

Let me know what you think, and if this has already been talked about or I'm giving it more importance than it has. Also If you'd like me to collaborate in some way.

๐Ÿ‘‹ Cheers!

Pushing tasks from within queue function

Hi, I'm trying to push task to queue from within queue function, e.g.

App = {}

const fn(data) {
  if (data.text === 'close_all') {
    const all = getAll()
    all.forEach(one => {
      App.q.push({ text: 'close_one', one })
    })
  }
  if (data.text === 'close_one') {
    closeOne(data.one)
  }
}

App.q = new Queue(fn)
App.q.push({ text: 'close_all' })

close_one processed normally, close_all - not.

Is there anything I need to do to make it work as desired please?

Thank you,
Alex

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.