Giter Site home page Giter Site logo

pg-boss's Introduction

Queueing jobs in Node.js using PostgreSQL like a boss.

PostgreSql Version npm version Build Coverage Status

async function readme() {
  const PgBoss = require('pg-boss');
  const boss = new PgBoss('postgres://user:pass@host/database');

  boss.on('error', error => console.error(error));

  await boss.start();

  const queue = 'some-queue';

  let jobId = await boss.send(queue, { param1: 'foo' })

  console.log(`created job in queue ${queue}: ${jobId}`);

  await boss.work(queue, someAsyncJobHandler);
}

async function someAsyncJobHandler(job) {
  console.log(`job ${job.id} received with data:`);
  console.log(JSON.stringify(job.data));

  await doSomethingAsyncWithThis(job.data);
}

pg-boss is a job queue built in Node.js on top of PostgreSQL in order to provide background processing and reliable asynchronous execution to Node.js applications.

pg-boss relies on SKIP LOCKED, a feature added to postgres specifically for message queues, in order to resolve record locking challenges inherent with relational databases. This brings the safety of guaranteed atomic commits of a relational database to your asynchronous job processing.

This will likely cater the most to teams already familiar with the simplicity of relational database semantics and operations (SQL, querying, and backups). It will be especially useful to those already relying on PostgreSQL that want to limit how many systems are required to monitor and support in their architecture.

Features

  • Exactly-once job delivery
  • Backpressure-compatible polling workers
  • Cron scheduling
  • Pub/sub API for fan-out queue relationships
  • Deferral, retries (with exponential backoff), rate limiting, debouncing
  • Completion jobs for orchestrations/sagas
  • Direct table access for bulk loads via COPY or INSERT
  • Multi-master compatible (for example, in a Kubernetes ReplicaSet)
  • Automatic creation and migration of storage tables
  • Automatic maintenance operations to manage table growth

Requirements

  • Node 16 or higher
  • PostgreSQL 11 or higher

Installation

# npm
npm install pg-boss

# yarn
yarn add pg-boss

Documentation

Contributing

To setup a development environment for this library:

git clone https://github.com/timgit/pg-boss.git
npm install

To run the test suite, linter and code coverage:

npm run cover

The test suite will try and create a new database named pgboss. The config.json file has the default credentials to connect to postgres.

pg-boss's People

Contributors

abelsoares avatar adamhamlin avatar amc6 avatar bhsiaonflx avatar bradleyayers avatar chougron avatar dependabot[bot] avatar druska avatar fiznool avatar ilijanl avatar jhermsmeier avatar jordaaash avatar jrf0110 avatar klesgidis avatar lavredisg avatar lionl avatar mamannn avatar marcbachmann avatar mrmurphy avatar nicoburns avatar phips28 avatar polesen avatar robgraeber avatar simoneb avatar simplyspoke avatar stnwk avatar tcoats avatar thakkaryash94 avatar timgit avatar yammine avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pg-boss's Issues

No retry for failed jobs

I am using done(error) if an error occurs.
retrylimit = 2
But it seems to ignore the retry settings for a failed job.

Is this on purpose?

Publish an array of jobs

Now the publish() function process a job at a time and if there are multiple jobs to publish, we run the publish() function in a loop. Given that inserting multiple rows in a query is more efficient, it seems to be a good idea to extend the function to accept an array of requests.

In our use case, our system receives data update notifications and publishes several different jobs for each notification. We are considering ways to send fewer queries in order to reduce I/O.

If the publication fails, all jobs fail. But I think the user can use the function at his own risk.

Archive settings missing in config docs

2.5.0: Configs for

Archive: deleteArchivedJobsEvery and deleteCheckInterval settings added for defining job retention. The default retention interval is 7 days.

is missing in the config docs; only in the changelog.

Would like to see a big picture overview

I've never used a transactional queue before and find myself having to infer the overall design from the API and configuration docs. I'm not sure I'm getting it right. It would be nice for the docs to include a big picture overview.

It seems that you implement a single queue of virtual queues, each virtual queue corresponding to a job name. Each job consumer subscribes to a virtual queue by job name.

When a job consumer receives a job, it processes the job and then calls a queue-provided callback to report job completion. The job may then be archived, meaning that it is removed from the virtual job queues and yet somehow also still available.

Questions:

(1) How does a job consumer report failure, such as to requeue the job for another attempt later?
(2) Can failed jobs be requeued with a delay not indicated when the job was originally queued? (I'll be downloading web pages, may want to wait before retrying.)
(3) Can I process jobs statefully? In my case, I need to queue a job to load an image from the web. Once successfully loaded, I then need to queue the job for conversion to a thumbnail. Can I just change the job name to put the job in a new queue? Or is this a dequeuing of the first job and a queuing of a second job in a new virtual queue?
(4) I'm not seeing an option not to archive jobs. Is archival somehow necessary? My project doesn't seem to need archival, because each job results in a downstream database record.
(5) How do I access archived jobs? How do I get rid of old ones? Would I need to code to your schema, or do you somehow provide access?

It would help to not only have answers to these particular questions, but also clarification of the big picture that such questions suggest I'm missing. Thank you! Looking forward to using your module...

Pulling jobs on demand

This proposal addresses issues #4 and #5, while still allowing all existing functionality.

Right now subscribers are rate-limited by the pre-configured newJobCheckInterval. Server capacity is going to vary over time. At present, a server that finds itself in a position to handle multiple jobs at once only has the option of setting a teamSize on subscription. However, the teamSize persists independently of the current server performance capacity, so the server can't use this option to vary work load.

If pg-boss provided an API for pulling a single job, the caller could dynamically decide how many simultaneous jobs it can handle and how often it can do jobs. If the caller wants to pull one job per interval of time, as pg-boss currently requires, the caller can still do that too with a pull-based API. If it were important for the queue interface to provide this functionality, it can either provide an additional method or a wrapper object to do the job.

The only potential issue I can see is that this interface would not be compatible with receiving asynchronous notification of new jobs from the database. (I understand that PG provides a NOTIFY feature, but I don't know how it's used or whether it can reach client computers.) It would be nice to have an API that could handle this as well, even if it were a separate feature.

Transactional migrations

@phips28, I'm creating this issue to track what you reported in #61. Since pg-boss uses pg's pool.query, implementing this change may require special attention from all users that pass in an existing database connection since it may or may not support transactions.

From https://node-postgres.com/features/transactions:
image

Another altenative I could try is squashing all migration scripts into a single multiple command statement. The docs don't seem to encourage that, though. There's probably other options as well.

My assumption at this point is that even if callers import a db prop during construction, the majority of them will finally resolve to the same pg driver in node. However, if they're highly abstracted, like strongloop's loopback connector, they may never have that level of control over the driver.

How to report failure

If a job callback fails asynchronously, done() doesn't appear to take an argument in typical node style. I read through #3 and my question is the same as OP's first:

(1) How does a job consumer report failure, such as to requeue the job for another attempt later?

I'm not necessarily looking to retry the failed job, but I'd like to tell Boss that the job failed as soon as it does fail, rather than waiting for the timeout. Is this possible?

If not, would you be open to a PR accepting an error argument to the done() call? I would think this would be non-breaking.

Publish job only if there is'nt an active one already

Hi!

Thank you for PG Boss! We use it in production and it works really well! πŸŽ‰

A question though, as I understand it there is no way currently to prevent a job from beeing queued if there is one queued or active at the moment? Would it make sense to add this?

Our use case is that we want to have a "clock" process that queues jobs on a regular basis, stuff that needs to happen every minute for instance. But we wouldn't want to run multiple jobs at the same time, that would be really wasteful, especially if a job takes longer then the interval for some reason.

Optionally support knex

I'm working on integrating a queue into my feathers/knex app. Given that I already have a knex instance being passed around the app that is already a pooled pg connection, I'd love to be able to supply my own db 'adapter'. Basically all that would be needed is to support a 'db' option in the constructor and use that instead of constructing your own instance. I can mimic the API of your adapter easily enough.

Thoughts?

Create jobs in the database using triggers

I would like to create jobs from within the database and have boss workers process them (sending emails and such). Ideally, I would use a trigger on some table that creates the job (based on rules). Can I create a job with SQL in a trigger?

Clearer assertion message

Right now the assert() function is widely used in the code to validate execution results. This is very nice but sometimes the assertion message is not very clear because the assertion is written in a compact form.

Example

In the manager.setStateForJob(), an assertion is used to determine whether the update is successful:

assert(result.rowCount === 1, `${actionName}(): Job ${id} could not be updated.`);

If it fails, it will print something likes actual: true, actual: false. This is not very clear since the result.rowCount could be zero or larger than one in this case.

If such assertion could be rewritten using assert.equal() to provide more precise error message, it would be very helpful for debugging.

Allow the job handler of subscribe() to process all fetched jobs at once

Currently the job handler function of subscribe() is called for each fetched job. It would be sweet if there is an option to allow the handler to receive an array of all fetched jobs. In my use case, doing a batch processing with all fetched jobs could help to significantly reduce system workload.

boss
  .subscribe(
    'my-queue', 
    { batchSize: 50 }, 
    (jobs) => batchProcess(jobs)
  );

This is the expected usage. Though it could be achieved by periodically calling fetch(), a native support is better (and implementing it looks simple after reading the source code).

Autoscaling approach for queue overload

Hi =)

awesome project and thx for making it public!!! <3

I have a question about auto-scaling.
We are testing pg-boss at the moment and everything looks fine, just the scaling part is a little bit hard. What is if there are more jobs in the queue as the worker can handle?

Our old setup on Amazon AWS:
App server publishes a notification(==job) to SNS -> SQS -> and a worker in beanstalk receives every job. If there are too much jobs it will auto-add new instances (scale).

We would now replace SNS/SQS with pg-boss to be more independent from AWS.
My first approach was to add a other "cron-like job" which checks the queue size every x min - and if there are too many unhandled jobs - it triggers a beanstalk upscale event.

Do you have any approach how to handle a queue overload?

What are you doing with "expired" jobs?

Hi,

if jobs are failing (however) they get retried (depending on the config, in my case 2 times) and getting the state "expired".
What do you use to track these "expired" jobs? or do you auto delete them somehow?

Boss subscriber does not work with a disconnection

Hi,
I'm trying perform a subscriber within ready function. And to publish the work within boss.connection. It's woring without disconnection, but with it not works.
Here is my snippet:

var koa=require('koa');
var PgBoss=require('pg-boss');
var boss=new PgBoss('postgress://globik:null@localhost:5432/postgres');
var app=koa();
boss.start().then(ready).catch(err=>console.log(err));

function ready(){
boss.subscribe('work', (job,done)=>{
console.log(job.name,job.id,job.data);
done().then(()=>console.log('confirmed done'))
})
}

// trying simulate a router call. Within router path
boss.connect().then(()=>{
// how many connections can one establish? 10? 20? 100?
console.log('Is connected');
boss.publish('work',{message:'sendPost'},{startIn:"6 seconds"}).then(jobid=>{
console.log(jobid);
boss.disconnect().then(()=>{console.log('Disconnected');});
// boss.subscribe('work') in the 'ready' function does not work
//without boss.disconnect() - works great
});
});
app.listen(process.env.PORT || 5000)
console.log('port 5000');

Is it normal when I will be using a publisher without disconnection?

Consider json instead of jsonb

I've found that in Postgres, json is both faster and more space efficient than jsonb. These are probably two more desirable traits than having Postgres validate that it's valid JSON.

Interested in your thoughts on this.

Add option for disabling prepared statements

It would be nice to have an option to disable the use of prepared statements.
We are using pgbouncer with transaction pooling, which does not support prepared statements, so it would be nice If we could simply disable them.

Support for querying jobs table to prevent duplicate jobs

Hello,

This looks like a nice library.
My use case is the following, I like to schedule a single message every 24 hours for active users.
Meaning if user came to the app and did some action, I like to schedule a message that will be sent to the user in 24 hours.

But I only want to have a single schedule message for every user in a 24 hour duration.

I was thinking querying the jobs table before creating a job, but I wonder if there's a better way?

Thanks,
Ran.

Use PostgreSQL client directly

I'm using pg-boss as a part of an express app. In that express app I use sequelize for interacting with the database. When I run pg-boss it does setup a new connection to my database. Can I setup pg-boss in such a way it uses the same database connection as sequelize does?

Suggestions for improved subscribe API

I think the subscribe API should be simplified, which would make it easier for users to understand and for you to maintain. The changes I'd vote for are:

  • Deprecate or remove convenience done method from job.
  • Make handler return a Promise, instead of than the classic Node.js (err: Error, success: any) => void callback.

An example of where I found the existing API confusing:

https://github.com/timgit/pg-boss/blob/master/docs/usage.md#expired-job

The payload is the same job object that the subscriber's handler function receives with id, name and data properties.

When I read this I thought it might mean that it includes the done callback too. But from looking at the code it looks like it doesn't, so technically you might consider it not the same job object that the subscriber's handler function receives.

It would also be great to be able to do something like throw a RetryJobError and have the job automatically retried later with exponential back-off.

Priorize jobs

Idea: wouldnt it be cool to give jobs a higher priority?

Now you do a simple FIFO by createdOn date.

Usecase:
I add jobs for doing some background calculations (non-critical)
I add jobs for sending push notifications to users (critical)

Now I would like to add a "high" priority to the push job.

This could be easy solved by adding an priority int field and change the order by in the nextJob()

Do you think this is useful?

Throttling the publisher is problematic

According to the documentation, the only throttling available is via a throttle period set upon publishing an event. The throttle only allows the publisher to post one task per throttle period, refusing to post the job to the queue otherwise. I'm seeing two problems with this approach:

(1) If the server needs to throttle, it's likely because the server is currently having trouble. It needs to throttle already-queued jobs in order to recover.

(2) Suppose I have an HTTP server queuing tasks in response to client requests. The server can't block the client request until the queue stops throttling. Instead, it has to queue the job and allow the next request to arrive. If the server is busy, the client will just have to wait longer for the job to be done. The client shouldn't be told, "Sorry the server is too busy to even register your request." So the job gets queued. The workaround would be to create a job-pending queue that feeds the actual job queue; the job's gotta go into a queue.

[docs] sinon mock of setIntermediate breaks subscribe

I just spent hours trying to figure out why job subscriptions weren't working when running in my mocha test harness.

Turns out my use of sinon to mock timers was interfering with the setImmediate call in Manager.

Others have had this problem with sinon; obviously it's not a pg-boss issue, but might be worth putting in the docs to save someone else tearing their hair out in future :-)

Cheers

Use with transactions in knex

I realize this is a question rather than an issue with the library (which appears to be awesome, by the way!), so please feel free to close if this isn't the place to ask. With that said, I'm interested in publishing a job within a knex transaction.

With Bookshelf, this would allow saving a model in the transaction (and related models that may be saved within the same transaction), and also publishing jobs (if the transaction succeeds).

Since pg-boss seems to maintain its own connection(s) to the database directly, I don't know if this is even possible, but I imagine you have a better idea than I do about how it might be done if it is.

worker: teamSize

General question:

my settings:

"newJobCheckInterval": "1000",
"teamSize": 40

If I have a teamSize of 40 for a worker do every worker query the database every 1000ms?
Are there 40 queries per second in this config?

or is there 1 query to get 40 jobs and then spread it to the worker processes?

thx

How to use together with routing system

Hi, I would be to know how to use pg-boss with routing in an typical application.
The problem that the start function called only ones. But the router calls are many times.
boss.start();
Router.get('/index', function(res,req){
boss.publish('work') ??...;
boss.subscribe(...)
});
Or should be boss.start() within a router path?
Or should I create a job within a router path using connect - disconnect API?

Suggestion: Concurrent jobs limit for subscribe api

Hey there, I've been using pg-boss for some simple worker server stuff and seems to work pretty well, good job!

So I have a worker server that I only want to run a single job concurrently at a time, and I'm kinda surprised there's not a config for that. I'm going to try doing it with fetch() but wanted to make a suggestion for the future.

Documentation for fetch is incorrect

The job/jobs returned from boss.fetch(name) do not have a done method, but rather it seems (according to the tests) that I must call boss.complete(job.id) to mark the job as completed.

https://github.com/timgit/pg-boss/blob/master/docs/usage.md#fetchname-batchsize

const jobName = 'email-daily-digest';
const batchSize = 20;

boss.fetch(jobName, batchSize)
  .then(jobs => {
    if(!jobs) return;

    console.log(`received ${jobs.length} ${jobName} jobs`);

    // our magical emailer knows what to do with job.data
    let promises = jobs.map(job => emailer.send(job.data).then(() => job.done()));
    
    return Promise.all(promises);      
  })
  .catch(error => console.log(error));

The above is an example for the fetch documentation in the usage docs, but it incorrectly shows a job being marked as completed with the done function.

While I don't understand why a job that was manually fetched has a different interface as one coming from subscribe, we should at least fix the documentation.

Note, I have a commit locally that fixes the documentation, but don't have proper permissions to push to origin.

Queue and job state

I have been looking at replacing a queue we use now (Bull based on redis) with boss(want to get rid of Redis for variety of reasons). I have couple questions. I have couple use cases where I need to provide progress of the jobs to the web clients, looking at the current schema this is not possible? Other simple case is that I have a admin panel that shows jobs as they get added and processed. Is there a way to grab info based on job id? Right now seems like only way to query DB directly. Is this the design you would suggest here

Thank a lot

Performance improvements

Hi,

we are currently testing pg-boss on a high volume event trigger machine. 1000 jobs added/sec + 10*80 worker.
And we see the SELECT to get the job is very unperformant without an index.
Adding following indexes improved it: pg cost 50000 => 14

CREATE INDEX job_createdon_id ON shard_01_pgboss.job (createdon, id); -- very important!!!
CREATE INDEX job_state ON shard_01_pgboss.job (state); -- sometimes pg query optimizer takes this index; e.g. if there are close to zero 'created' jobs

What was your highest tested jobs/sec?

I am having an error when trying to create a simple job

When I run this code:

async function setupJobs() {
  const postgresURL = config.get('datasources:postgresql:url');
  const boss = new PgBoss(postgresURL);
  await boss.start();
  log.debug('Ready');
  boss.publish('work', () => {
    console.log('STUFF');
  });
}

I get:

Web server listening at: http://0.0.0.0:3000
  collectai:boot:jobs:debug Ready +0ms
Unhandled rejection error: syntax error at or near "ON"
    at [object Object].Connection.parseE (/User/Seinh/Project/node_modules/pg-boss/node_modules/pg/lib/connection.js:543:11)
    at [object Object].Connection.parseMessage (/User/Seinh/Project/node_modules/pg-boss/node_modules/pg/lib/connection.js:370:17)
    at Socket.<anonymous> (/User/Seinh/Project/node_modules/pg-boss/node_modules/pg/lib/connection.js:109:22)
    at emitOne (events.js:77:13)
    at Socket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:146:16)
    at Socket.Readable.push (_stream_readable.js:110:10)
    at TCP.onread (net.js:523:20)
From previous event:
    at Db.execute (/User/Seinh/Project/node_modules/pg-boss/lib/db.js:36:20)
    at Db.executeSql (/User/Seinh/Project/node_modules/pg-boss/lib/db.js:27:25)
    at insertJob (/User/Seinh/Project/node_modules/pg-boss/lib/manager.js:225:29)
    at deferred (/User/Seinh/Project/node_modules/pg-boss/lib/manager.js:184:17)
From previous event:
    at Manager.publish (/User/Seinh/Project/node_modules/pg-boss/lib/manager.js:176:20)
    at PgBoss.publish (/User/Seinh/Project/node_modules/pg-boss/lib/index.js:137:41)
    at Object._callee$ (jobs.js:10:8)
    at tryCatch (/User/Seinh/Project/node_modules/regenerator-runtime/runtime.js:62:40)
    at GeneratorFunctionPrototype.invoke [as _invoke] (/User/Seinh/Project/node_modules/regenerator-runtime/runtime.js:336:22)
    at GeneratorFunctionPrototype.prototype.(anonymous function) [as next] (/User/Seinh/Project/node_modules/regenerator-runtime/runtime.js:95:21)
    at step (/User/Seinh/Project/server/boot/jobs.js:34:191)
    at /User/Seinh/Project/server/boot/jobs.js:34:368
    at run (/User/Seinh/Project/node_modules/core-js/modules/es6.promise.js:89:22)
    at /User/Seinh/Project/node_modules/core-js/modules/es6.promise.js:102:28
    at flush (/User/Seinh/Project/node_modules/core-js/modules/_microtask.js:18:9)
    at doNTCallback0 (node.js:407:9)
    at process._tickDomainCallback (node.js:377:13)
    at process.fallback (/User/Seinh/Project/node_modules/async-listener/index.js:450:15)

Deleting/clearing queue

Is there a way (beside connecting to PG and doing it manually) to delete all jobs using pg-boss api?

Changing schema name leads to an error

Hi Tim,

I was trying to change the schema name from pgboss to _queue the way you mention in the documentation:

{
    database: ...,
    host: ...,
    user: ...,
    password: ...,
    poolSize: 1,
    archiveCompletedJobsEvery: '2 days',
    schema: '_queue'
  }

This creates the DB as expected. There is a _queue schema with two tables in it job and version. On the second run, however, I get an error. This does not happen when I either leave schema out, or set it to pgboss.

Error:

{ error: operator does not exist: character varying < _queue.job_state
    at Connection.parseE (.../node_modules/pg/lib/connection.js:545:11)
    at Connection.parseMessage (./node_modules/pg/lib/connection.js:370:19)
    at Socket.<anonymous> (.../node_modules/pg/lib/connection.js:113:22)
    at emitOne (events.js:115:13)
    at Socket.emit (events.js:210:7)
    at addChunk (_stream_readable.js:266:12)
    at readableAddChunk (_stream_readable.js:253:11)
    at Socket.Readable.push (_stream_readable.js:211:10)
    at TCP.onread (net.js:587:20)
  name: 'error',
  length: 224,
  severity: 'ERROR',
  code: '42883',
  detail: undefined,
  hint: 'No operator matches the given name and argument type(s). You might need to add explicit type casts.',
  position: '121',
  internalPosition: undefined,
  internalQuery: undefined,
  where: undefined,
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'parse_oper.c',
  line: '728',
  routine: 'op_error' }

Do you maybe have an idea?
thanks!

`archive` event: name vs meaning

First of all, thanks for a nice lib :)

I've been wondering why job deletion is called "archive"? As far as I can tell from sources, it simply DELETEs rows, and I think it is a bit of a stretch to call it archiving; from the docs, I was under impression it moves old stuff to a separate table, something like pgboss.job_archive.

Same goes for configuration options (like archiveCompletedJobsEvery).

Good pool size?

Awesome library!
Weβ€˜re about to integrate it into our Postgre-centric Node.js Framework.

Since most hosted Postgres solutions have a connection limit, I am trying to figure out a good configuration setup.
What are your experiences with the connection pool. How many connections is the minimum? Would it theoretically work with one? What are the drawbacks?
Thanks!

cheers

Eugene

Option to schedule a job after a specific timestamp

I think it would be useful to me to have an API that allows a job to be scheduled at a specific time. At the moment I'm achieving this via startIn, but it relies on the clocks of the worker and the database being close.

I'd like to be able to pass a Date to pg-boss, and have it convert it to UTC and send that to Postgres.

Slight aside: why is job.singletonOn a timestamp instead of timestamptz?

Could not get fetch the jobs

Here is my demo below. I use boss.fetch but that return null.

const PgBoss = require('pg-boss');
const boss = new PgBoss('postgres://postgres:mysecretpassword@localhost/demo');

const onError = error => console.error(error);

boss.on('error', onError);

boss.start()
  .then(ready)
  .catch(onError);

function ready() {
  let id;
  boss.publish('demo', { param1: 'parameter1' }, { startIn: 100 })
    .then(jobId => console.log(`created demo ${jobId}`))
    .then(() => boss.fetch('demo', 10))
    .then((jobs) => console.log('jobs =', jobs))
    .catch(onError);

  boss.subscribe('demo', someJobHandler)
    .then(() => console.log('subscribed to demo'))
    .catch(onError);
}

function someJobHandler(job) {
  console.log(`received ${job.name} ${job.id}`);
  console.log(`data: ${JSON.stringify(job.data)}`);

  job.done()
    .then(() => console.log(`demo ${job.id} completed`))
    .catch(onError);
}

Is it possible to retry a job with delay and order ?

Hi,

I'm searching a way to retry a job with an exponential delay but while keeping the order. If I have the following queue ( with j1 and j2 some jobs)

// Queue
----------------------------------------
| j1 | j2 | .... <--- new jobs comming
----------------------------------------

I would like to execute j1 and retry maximum X times with an exponential delay before trying to execute j2. (It must be blocking or locked until j1 succeed or failed X times)

Is there an easy way to do it ?

New index for performance improvement

I tried some different indexes to improve our worker queries.
(they are currently the performance killer on our database πŸ™ˆ )

Current situation: We have ~4000 created jobs in our table with startIn set in the future.

EXPLAIN has cost of ~1770:
screenshot_2018-04-06_21_35_43

After creating this index, it was ~13:
πŸ‘‰ CREATE INDEX job_name ON shard_01_pgboss.job (name);

screenshot_2018-04-06_21_35_58

A lot faster πŸŽ‰

Postgres decides itself what index to use to get the optimal performance, so if you only have 10 jobs in the table, it might not use the name index.


does anyone else have different index setups?

Making job check interval a function of job type

newJobCheckInterval is configured for all of pg-boss, but different job types can have different interval requirements. Consider the following job types:

  • send health check to cluster node (might be every 10 minutes)
  • download an mp3 from a web site (might be once per minute)
  • delete a file (might be once per 100ms -- might be queued because many files)

It would be nice to not only configure this as a function of job type, but to be able to dynamically change it as a function of job type, in order to support throttling (see #4).

Bug: job object returned by fetch doesn't have done() method.

Seems like job.done() is not defined when using the fetch function. For example:

boss.fetch('queue_name').then(function(job) {  
  if (!job) return;
  job.done(); //done() is not a defined   
  return null;   
});   

As a workaround, I've been using boss.complete(job.id)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.