Giter Site home page Giter Site logo

better-queue's People

Contributors

bennycode avatar ddehghan avatar dominicrj23 avatar favio41 avatar jknielse avatar leanderlee avatar mananruck avatar marcbachmann avatar npow avatar psantos9 avatar timdaub avatar utopiah avatar yawhide avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

better-queue's Issues

Ability to flush queue on demand

I am using better-queue to queue up client logs prior to sending them to a remove server in batches. However, when the user chooses to close my application, any queued logs are lost.

Is there any way to flush the queue on demand so that I can be sure all logs have been sent to the server prior to the application shutting down?

Precondition delay and retry failure.

The way i've read the documentation is that the precondition function will be called until it passes and there are objects in the queue. Also, precondition retry will be delayed by whatever the timeout is.

It looks as if every time an object is added to the queue, it calls the precondition function, ignoring the delay.

In addition to the delay being ignored, my code breaks the queue from calling the precondition infinitely and i'm having a hard time figuring out why.

I've setup a queue with the following code:

receiptQueue = new Queue(function (data, cb) {
        printService.print(data)
            .then(function () {
                cb(null, "ok");
            }).catch(function (err) {
            cb(err);
        });
    }, {
        afterProcessDelay: 500,
        maxRetries: 12, retryDelay: 5000,
        precondition: function (cb) {
            printService.isOnline()
                .then(function (result) {
                    if (result) {
                        console.log("Online");
                        cb(null, true);
                    } else {
                        console.log("Offline");
                        cb(null, false);
                    }
                })

        }, preconditionRetryTimeout: 1000
    }
);

I'm using the latest better-queue and node v8.

Get current queue size

I am using the in-memory queue, and at some point I reach NodeJs' max heap.

To prevent this, I want to limit my queue to a maximum of X tasks waiting in queue, until it goes down, I don't push anything.

The problem is, I can't find any way to know what the current queue size it.

Promise support

Hi,

Now that Node.js 8 has dropped with full async/await support, it would be awesome to get native support for promises in this module. I'm currently using it something like this:

const queue = new Queue((input, cb) => {
  doMyActualStuff(input).then(
    () => setImmediate(cb, null),
    err => setImmediate(cb, err)
  )
})

It would be awesome if it was okay to return a Promise directly from the function, and have it just workβ„’

const queue = new Queue(async (input) => {
  // ...
})

This could be done in a few ways, some of which includes:

  1. Make a breaking change and only support promises (my personal favourite but might be a bit to disruptive)
  2. Have a secondary require require('better-queue/promise') that exports the promise-based version
  3. Detect if a promise is returned by the function, and/or detect the functions arity (e.g. mocha looks for both, and complains if the function arity is 2 but it also returns a Promise)

I'd be happy to submit a PR for any of the approaches, or any other alternative :)

Thanks πŸ‘‹

Types Definition

Feature Request
better-queue lacks a Types definition; either as part of this project or as part of DefinitelyTyped.

Even without types, better-queue works fine with Typescript. Types is simply a "nice to have."

Steps to Replicate
index.ts

import * as Queue from 'better-queue';

let myQueue : Queue = new Queue(fn);

tsc build error -> Could not find a declaration file for module 'better-queue' '/queue.js' implicitly has an 'any' type.

Work Around
Import with:
let Queue = require('better-queue');

What about having access to the task id?

Description

I would need to have access to the task id on push or on queued if this is posible. So I can send it back to the user. I dont know if this is the best aproach...

My process goes as follows:

  1. Receive data from user
  2. Create data
  3. Push to queue
  4. On accepted I should respond to user with the id of the process for later use.
  5. User send me the id to retrieve the

My Aproach

Now I save them myself with a custom id, so if program restarts they are all lost, It would be really helpfull to be able to use the id from the task.

Question

Is it posible to get the task id either on queued or on accepted?

Pushing tasks from within queue function

Hi, I'm trying to push task to queue from within queue function, e.g.

App = {}

const fn(data) {
  if (data.text === 'close_all') {
    const all = getAll()
    all.forEach(one => {
      App.q.push({ text: 'close_one', one })
    })
  }
  if (data.text === 'close_one') {
    closeOne(data.one)
  }
}

App.q = new Queue(fn)
App.q.push({ text: 'close_all' })

close_one processed normally, close_all - not.

Is there anything I need to do to make it work as desired please?

Thank you,
Alex

async/await

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

const q = new Queue(async (input, cb) => {
  console.log(input)
  await sleep(1000)
  cb()
}, {
  concurrent: 3
})

Now I need to call in async callback functions. Can you do an automatic wait for asynchronous functions? To remove that callback

Knex:warning - Pool2 - Error: ResourceRequest timed out

I've come accross this error, when adding a bunch of jobs, any idea why this could be happening?
I can attach code example if needed.

PD:
It fires event accepted and then fails.

Knex:warning - Pool2 - Error: ResourceRequest timed out
Unhandled rejection Error: Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?
    at C:...\node_modules\knex\lib\runner.js:191:30
    at tryCatcher (C:...\node_modules\bluebird\js\release\util.js:16:23)
    at C:...\bluebird\js\release\catch_filter.js:17:41
    at tryCatcher (C:...\node_modules\bluebird\js\release\util.js:16:23)

Store does not seem to work

Hi, I have tried using the SQLite store, but the tasks do not seem to be persisted. When I push a new task to a queue, and the task is failing, I expect to find it in the database, at least between task executions, but it's not the case, the table in the db is always empty. I tried to understand what's going on and found out that the release lock function is being called after every run and in the store it's implemented to delete the task from the db. Is that correct? Why is the task not held in the database between executions to make it survive app restarts?

The behavior I expect is actually that a task remains in the db until it finishes or fails after max retries.

Push/completion hooks (not sure what else to call them)

It'd be useful to have some kind of mechanism for running a function upon every push, and just before calling the callbacks for the completion of the associated items. Tracking the progress of items through a pipeline would be made easy by having queues that check in with a reporting system upon the addition and completion of items on the queue.

Update custom store documentation

I've been having problems with the sqlite store, and i'm trying to create a custom store as it's explained in the docs.

Create the store object:

// ./custom-store.js
module.exports = {
  tasks: {},
  connect(cb) {
    console.log('connect');
    this.tasks = fs.readJsonSync(storePath);
    let count = Object.getOwnPropertyNames(this.tasks).length;
    cb(null, count);
  },
  getTask(taskId, cb) {
    console.log('getTask', taskId);
    // Retrieves a task
  },
  putTask(taskId, task, priority, cb) {
    console.log('putTask');
    // Save task with given priority
  },
  takeFirstN(n, cb) {
    console.log('takeFirstN');
    // Removes the first N items (sorted by priority and age)
  },
  takeLastN(n, cb) {
    console.log('takeLastN');
    // Removes the last N items (sorted by priority and recency)
  }
}

Use Store:

let myStore = require('./custom-store');
let queue = new Queue(task, { store: myStore });

When running it throws:

C:...\node_modules\better-queue\lib\queue.js:188
    self._store.getRunningTasks(function (err, running) {
                ^
TypeError: self._store.getRunningTasks is not a function

If getRunningTasks is required I't would be nice to add it to the documentation, im up to doing it when I have some time.

Duplicate Check?

Hi there,
first of all great package, keep it up!
Sorry to open an issue, didn't know where to ask.

Is there a possibility to print out the current elements in the Q and/or to check if the element about to being pushed is already in the Q? Couldn't find something in the documentation.

Thank you very much!

Document queue.length

When using better-queue, I needed to check if there are remaining items in the queue (couldn't rely on the drain event). I considered tracking pushed and finished/failed tasks, but I discovered the length property in the library's source. It works fine, however it is not documented.

I'd love to see the property nailed down in the readme, mainly to make it a breaking change when it'd be renamed or removed. (To be in line with the rest of the API, an accessor method like getRemainingCount() would actually be more fitting. However, the length property is already there. Also, even more importantly, it is a very common JavaScript convention.)

I would be happy to attach a PR if there are no objections against the idea?

merge : Callback is twice called.

When i try to add a duplicate task and handle it with merge/default behaviour. It merges the task.
But, the callback handler of the duplicate task is called twice even though after merge there is only 1 task.

Cannot update message on task progress

I fail to understand why you can update the progress message anytime at the batch level, and not at the task level (you can set it only once)

In other words, why this :
(worker.js:176)
self.progress.message = msg || '';

and this :
(worker.js:186)
self.progress.tasks[id].message = self.progress.tasks[id].message || msg;

Anyway, thanks for the tool. Loving it !

In service function

Function to check if the queue is in service before pulling things off of the queue -- This would be useful for processes that require certain conditions to be met (internet, environment, other variables) before processing the queue.

It would need to be accompanied by a fixed interval retry to check when the service comes back online.

How to create partitioned queues

I have jobs origination from multiple user accounts landing in central better-queue, running with concurrency set to one, since the jobs must be run in order in which they are received, and only one at a time. This works, but is slow because I am forcing synchronous processing among all accounts, which is not really necessary. In reality I just need to make sure that each account's tasks are run synchronously and in the order of which they are received. So I'm trying to figure out the best way to adapt better-queue to this solution. Is there a config option that would create one worker for each 'account-id' or should I be creating a new/independent queue (maybe using classes) for each account, keeping an array of queues? Has anybody come up with a nice solution to this problem with better-queue?

Catch max retries

It seems that no matter what my maxRetry value is, the batch_failed event is called on the first failure.

Is there an even to let me know when it reached the max retry?

Task events not working

Hello,
I tried the following example:

var uploader = new Queue(function (file, cb) {
  this.failedBatch('some_error')
  this.finishBatch(result)
  this.progressBatch(bytesUploaded, totalBytes, "uploading")
});
uploader.on('task_finish', function (taskId, result) {
  // Handle finished result
})
uploader.on('task_failed', function (taskId, errorMessage) {
  // Handle error
})
uploader.on('task_progress', function (taskId, completed, total) {
  // Handle task progress
})
 
uploader.push('/some/file.jpg')
  .on('finish', function (result) {
    // Handle upload result
  })
  .on('failed', function (err) {
    // Handle error
  })
  .on('progress', function (progress) {
    // progress.eta - human readable string estimating time remaining
    // progress.pct - % complete (out of 100)
    // progress.complete - # completed so far
    // progress.total - # for completion
    // progress.message - status message
  })

All events attached to the task didn't work except progress event was triggered only one time, but events attached to the queue worked fine.

Why send only the message attribute?

It is up to the application to decide how to deal with the error. But this gets tricky when you get only the error as a string.

Is that really correct?

Worker.js: 72

self.failedBatch(err.message || err);

Should be:

self.failedBatch(err);

Anyway, thank you. Great tool!

webpack

I'd like to use better-queue through webpack, but it dynamically requires the memory store as a fallback. Would it add too much overhad to always require memory store statically, and just not use it when overridden?

need support for delayBatch

Your lib is wonderful, but I need you support to config to pass this senario - Delay sending time 1 minute :

  1. I create a queue that hold all events (time serial event) with FIFO queue.
  2. And will hold for 1 minutes in queue
  3. Continuous send back via call function callback.

I've read full your document and code but can not use your lib to define my case. Could you support me?

Sorry, my bad English

Make queues faster

One idea is to change taskIds to use numbers by default (they are much faster in JS). Another is to figure out a way to remove the write queue...

Have concurrency take function?

We've been using better-queue in Gatsby with great success so thanks for the great design! One thing I'd like to add is the ability to change the concurrency of different queues as at different times.

Is this something you've considered? Would be interested in taking? I haven't looked at your code at all so not sure how involved a PR for this would be but if you're interested, I'd be happy to take it on.

Exiting with unfinished jobs

I wrote an app which parses a large xml file with around 17M items of interest. These are added to a queue and using batch processing, written to a web service. When parsing is complete, the app exits (status code 0) with about half of the items are left unwritten. Any idea what might be going on?

better-queue not working with Electron version 2.0.6

Hey there, recently, I have upgrade my application to use Electron 2.0.6 and now an exception is thrown when I attempt to set the store in the Queue using sqlite. This only happens after the executable has been built into a Windows executable by electron-winstaller. The code snippet is:

this.queue = new Queue(this.queueProcessor, {
      store: {
        type: 'sqlite',
        path: 'path/to/file.db3',
      },
    });

The better-queue version I'm using is 3.8.10 and better-queue-sqlite is 1.0.5 and electron-winstaller is 2.6.4.
Not sure whether this is an issue here or in Electron so correct me if I'm asking in the wrong place.

Is this library stable? Should I use RabbitMQ, etc instead?

Hi, I'm sorry for filing so many issues lately but I was using it for a work project and needed some feedback.

First of all, I really appreciate the work put into this project, and this is just constructive advice.

Let me explain

Here is the thing, I think it's a really great package that serves a purpose, but I think it's not stable enough to be considered stable nor be used in production or in big apps. At least not for now, mostly because of the stores, it's not 100% reliable, and it fails more than it works... Although memory store works pretty good (I can explain this in detail if this issue goes through).

Let me rephrase that, it is stable and reliable, although I had some incidents with custom stores (might have been my fault). My point is to notice that this library will not scale well, and will not work for decentralized apps or big scale apps.

What I mean is, take for example you want to be able to access the queue from 2 parts of the system (one writes, one reads), which are in different servers or containers, in this case, it would be kinda difficult to adapt this lib to that architecture.

If you are new or semi-new, please don't take this to seriously for now, it will not be a problem at all, this is for more advanced apps and systems. This lib will do the job well and with ease.

In the end, I switched to using RabbitMQ as it's more solid and stable for bigger applications and great for decentralized queues.

I'm willing to contribute to this package as I really think it has its purpose and could be very powerful... we could also get inspiration from RabbitMQ...

Let me know what you think, and if this has already been talked about or I'm giving it more importance than it has. Also If you'd like me to collaborate in some way.

πŸ‘‹ Cheers!

error

16:43:15 post /api/auth/authorize? 200 2347.502 ms
16:43:15 info { status: 'cancelled' } (sync.js:213)
16:43:15 err! [console] 
  printErrorAndExit [console] (source-map-support.js:366)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:366:13)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
16:43:15 err! [console] /usr/local/lib/node_modules/better-queue/lib/queue.js:277
    var tickets = self._writing[taskId].tickets;
                                       ^
  printErrorAndExit [console] (source-map-support.js:367)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:367:13)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
16:43:15 err! [console] TypeError: Cannot read property 'tickets' of undefined
    at Statement.<anonymous> (/usr/local/lib/node_modules/better-queue/lib/queue.js:277:40)
  printErrorAndExit [console] (source-map-support.js:370)
    at Console.error (/Users/yawhide/workspace/diamond/baymax/src/utils/log.js:141:22)
    at printErrorAndExit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:370:11)
    at process.emit (/Users/yawhide/workspace/diamond/baymax/node_modules/longjohn/node_modules/source-map-support/source-map-support.js:383:16)
    at process._fatalException (node.js:219:26)
[nodemon] app crashed - waiting for file changes before starting...

Safe to replace setImmediate with setTimeout?

I am trying to use a webpacked version of better-queue in a Firefox extension, and it looks like setImmediate doesn't work in Firefox. Is it safe to replace all calls to setImmediate with setTimeout? I've tried reading the docs on the JS event loop, but it's not clear to me what the potential downsides are of using setTimeout for better-queue.

Access task in queue by id

How can I access I trigger 'progressBatch', 'finishBatch' failedBatch' to a specific ticket id using queue ?
how can I access ticket by id ?

callbacks with 1 concurrent job

Is there an obvious reason why my queue doesn't wait for the callback to be executed before starting the next job?

screen shot 2017-12-21 at 3 17 57 pm

At the end of webhookUpdater(), I call done(), but as the next job is queued, it starts before that happens. Am I misunderstanding something? Also, is there a better place to ask questions like this?

queue.push() doesn't check if task id is currently a worker

Just a clarification on my part. I was noticing that long running tasks aren't considered for duplicating checking on 'id' when pushed into a queue. For example, I could have a queue of concurrency 10, and push in a task that'll take 10 minutes to execute. It's id property is long_running_task. Once work begins on it, I can push another tasks, `` into the queue and it'll begin working, despite having the same id as the long running task in the worker. I would have thought that if there's a task with the same ID that's working, any new tasks with the same ID would be rejected/merged?

issue with concurrency and db store (sqlite)

Hello, been using better-queue in production for a long time, works great :)

I'm looking to add some persistence to the queue by way of sqlite. When running a test using:

batchsize: 1,
concurrent: 1,
store: {
    type: 'sql',
    dialect: 'sqlite',
    path: '/home/data.db'
},

The second multiple items are pushed to the queue in a loop, it always executes 2 items immediately. It should be executing one at a time since concurrency is set to 1. As soon as I remove the store options it works perfectly fine.

Is there a simple way to fix this?

Missing unshift method

I need to add something to top of the queue without having to configure the Queue to filo from the start.

There should be a way to either do unshift or change the filo setting after the constructor.

Seems implementing unshift is pretty easy. I am happy to do that if you all need it as well.

concurrent problem, wrong order

Hi, I have problem with concurrent, if I have concurrent set on 8 (for example) and add many tasks the task are do in wrong order. the first 7 is do invert the rest is done normally. There is no easy explanation how it will behave when I push 100 task for example. Sometimes is just first 8 in wrong order sometimes is first 16 but than I got something like this 7 6 5 4 3 2 1 11 10 8 9 12 13 14 15 16. The filo option is set on false so first in first out.

Clear tasks in queue

Is there a way to clear all tasks in a queue ? and also clear task by id to remove task from queue before being processed?

Any Sample for worker pulling messages from queue?

Hi,

I am looking for in memory queue options in node as i want to avoid installing redis, rabbitmq for queue/worker mechanism.

Current sample shows processing of messages while initializing queue in a process method. Do we have any mechanism in better-queue where workers can pull messages from queue? If Yes, Please point to some example which can be referred.

Thanks

Delay start of task

I'm looking to use better-queue to debounce incoming tasks. given a debounce period of 10 seconnds, an event stream like:

  1. t0: schedule export for item 0
  2. t5: schedule export for item 0
  3. t10: schedule export for item 1
  4. t20: schedule export for item 0

I want the export for item 0 to start at t15 (1. is cancelled because it's superseded by 2. within 10 seconds) and at t20, and the export for item 1 als at (or close to) t20.

Can this be done with better-queue? I am looking at using the item ids as task ids which would take care of superseding, but not the delayed start. I had planned to set concurrency to 1 -- exports are fairly expensive so I'd rather have only one running at a time.

persistent queues aren't persistent

after it pulls it off the queue and program crashes.
problem is, the queue pulled the tasks off the queue before the tasks successfully finished, so now they are gone.

How the library works and is clustering supported?

Please update the README:

  • To outline how the library works.
  • To clarify if clustering is supported or not.

After reading the source code:

I guess clustering (running tasks on many machines) is not supported yet, because:

  • On startup, all running tasks are forcefully resumed. The code doesn't care that the tasks may be being taken by other machines.
  • When taking new tasks, the code only care about newly inserted tasks. It doesn't care about the tasks that have been marked as taken by other machines, but other machines may have died.

To support clustering with "at least once" semantic, these little modifications can be made ("little" compared to the existing great work):

Does it work with karma+jasmine ?

Hi,
I'm trying to do some asynchronous operations inside a task which resolves when I get the result and complete some manipulations. And its working properly, except when I'm trying to make test cases for it.
Here's a code example:

myservice.service.ts
public saveToAPI(url: string, id: string, payload: any): Promise < any > {
        return new Promise((resolve, reject) => {
            this.queue.addTask({
                id: `operation-${id}`,
                task: () => {
                    return this.http.post(url+'/save', payload).toPromise((res) => {
                       //do some operations here
                       resolve()
                    }).catch(err => {
                       reject(err)
                    })
                }
            });
        });
    }

myservice.spec.ts

fdescribe('saveToAPI', () => {
    fit('should save payload to API', (done) => {
        service.saveToAPI('https://myurl.com', 'uniqid', {name: 'John Doe'}).then(res => {
            //expect(res).toEqual(something)
            done();
        }).catch(err => {
            fail(err);
            done();
        });

        const request = httpMock.expectOne(
            'https://myurl.com/save'
        );
        request.flush({
            status: 200,
            data: {
                'message': 'Saved successfully'
            }
        });
    })
});

Errors in the console only mention that the httpMock failed to record any request made to the server, and it only happens for functions which use better-queue.

Any ideas ?

IMP: Length property doesn't decrement upon Cancellation of a task

When i call cancel API, it cancels the task, the Store Queue also gets decremented but Length property does not decrement.
uploader.cancel('/path/to/file.pdf');

I believe it is decremented only upon fetching next task. And since cancel does not trigger fecthing next task, Length property does not decrement.
Appreciate if this could be fixed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.