Giter Site home page Giter Site logo

mongoqueue's Introduction

mongoqueue

Properties

  • Isolation

    Do not let different consumers process the same message.

  • Reliablity

    Do not let a failed consumer disappear an item.

  • Atomic

    Operations on the queue are atomic.

Usage

A queue can be instantiated with a mongo collection and a consumer identifier. The consumer identifier helps distinguish multiple queue consumers that are taking jobs from the queue:

>> from pymongo import Connection
>> from mongoqueue import MongoQueue
>> queue = MongoQueue(
...   Connection(TEST_DB).doctest_queue,
...   consumer_id="consumer-1",
...   timeout=300,
...   max_attempts=3)

The MongoQueue class timeout parameters specifies how long in a seconds a how long a job may be held by a consumer before its considered failed.

A job which timeouts or errors more than the max_attempts parameter is considered permanently failed, and will no longer be processed.

New jobs/items can be placed in the queue by passing a dictionary:

>> queue.put({"foobar": 1})

A job priority key and integer value can be specified in the dictionary which will cause the job to be processed before lower priority items:

>> queue.put({"foobar": 0}, priority=1})

An item can be fetched out by calling the next method on a queue. This returns a Job object:

>> job = queue.next()
>> job.payload
{"foobar": 1}

The job class exposes some control methods on the job, for marking progress, completion, errors, or releasing the job back into the queue.

  • complete Marks a job as complete and removes it from the queue.
  • error Optionally specified with a message, releases the job back to the
    queue, and increments its attempts, and stores the error message on the job.
  • progress Optionally takes a progress count integer, notes progress on the job
    and resets the lock timeout.
  • release Release a job back to the pool. The attempts counter is not modified.

As a convience the job supports the context manager protocol:

>> with job as data:
...   print data['payload']

{"foobar: 0}

If the context closure is exited without the job is marked complete, if there's an exception the error is stored on the job.

Inspired By

Running Tests

Unit tests can be run with

$ python setup.py nosetests

Changes

0.6.0 - Feb 4th, 2013 - Isolate passed in data from metadata in Job. 0.5.2 - Dec 9th, 2012 - Fix for regression in sort parameters from pymongo 2.4 0.5.1 - Dec 2nd, 2012 - Packaging fix for readme data file.

Credits

Kapil Thangavelu, author & maintainer Dustin Laurence, sort fix for pymongo 2.4 Jonathan Sackett, Job data isolation.

mongoqueue's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

mongoqueue's Issues

Should add some indexs to speed up

It's very slow when pull tasks via next(), finally i find there is no any index.
After i added following indexs, it's fast now.
collection.ensure_index([('priority', ASCENDING)])
collection.ensure_index([('locked_by', ASCENDING), ('locked_at', ASCENDING), ('attempts', ASCENDING)])
collection.ensure_index([('_id', ASCENDING), ('locked_by', ASCENDING)])

Status of library

This library is in status coding, or is better use another, my project is in python 3 and this library not run the problem is that I use aws lambdas and I can't modify the code.

default timeout too long > 300 days

The default timeout is int: 300

See: https://github.com/kapilt/mongoqueue/blob/master/mongoqueue/mongoqueue.py#L34

It is used to find locked jobs which have expired here:
https://github.com/kapilt/mongoqueue/blob/master/mongoqueue/mongoqueue.py#L66

the use of one unnamed parameter to timedelta means it takes the argument as 'days'

The only way to get sub-day timeouts is to use day fraction.

See some examples here:

>>> from datetime import datetime, timedelta
>>> datetime.now()
datetime.datetime(2019, 6, 4, 17, 54, 52, 339957)

>>> datetime.now() - timedelta(300) # the default
datetime.datetime(2018, 8, 8, 17, 55, 4, 593967)

>>> datetime.now() - timedelta(1) # 1 day
datetime.datetime(2019, 6, 3, 17, 55, 12, 929884)

>>> datetime.now() - timedelta(0.04166667) # 1 hour as decimal days
datetime.datetime(2019, 6, 4, 16, 55, 39, 443801)

Please consider defaulting to a lower time-base like hours.

I had assumed it was minutes. (5hrs)

repair only affects one job

The use of find_and_modify means only one Job is recovered from stale lock.

The same query + update expressions work with the PyMongo function update_many which updates all documents that match.

it = q.collection.update_many({"locked_by": {"$ne": None}, "locked_at": { "$lt": datetime.now() - timedelta(0.125)}},{"$set": {"locked_by": None, "locked_at": None},"$inc": {"attempts": 1}})

'Collection' object is not callable

>>> queue = MongoQueue(Connection('localhost').doctest_queue, consumer_id='consumer-1', timeout=300, max_attempts=3)
>>> queue.put({"foobar": 1})
TypeError: 'Collection' object is not callable. If you meant to call the 'insert' method on a 'Database' object it is failing because no such method exists.

Attempts counter resets everytime on queue.next()

  1. Defined
    queue = MongoQueue(
    ... Connection(TEST_DB).doctest_queue,
    ... consumer_id="consumer-1",
    ... timeout=300,
    ... max_attempts=3)

  2. Then got job by ----- job = queue.next()

  3. Job has error ---- job.error("Error") ------- here attempts counter incremented to 2

  4. Then again got the same job by ----- job = queue.next() ------ but attempts counter got reset to 1

Question : how will the job be marked failed is attempts counter gets reset?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.