Giter Site home page Giter Site logo

jobtastic's Introduction

jobtastic- Celery tasks plus more awesome

Build Status

Jobtastic makes your user-responsive long-running Celery jobs totally awesomer. Celery is the ubiquitous python job queueing tool and jobtastic is a python library that adds useful features to your Celery tasks. Specifically, these are features you probably want if the results of your jobs are expensive or if your users need to wait while they compute their results.

Jobtastic gives you goodies like:

  • Easy progress estimation/reporting
  • Job status feedback
  • Helper methods for gracefully handling a dead task broker (delay_or_eager and delay_or_fail)
  • Super-easy result caching
  • Thundering herd avoidance
  • Integration with a celery jQuery plugin for easy client-side progress display
  • Memory leak detection in a task run

Make your Celery jobs more awesome with Jobtastic.

Why Jobtastic?

If you have user-facing tasks for which a user must wait, you should try Jobtastic. It's great for:

  • Complex reports
  • Graph generation
  • CSV exports
  • Any long-running, user-facing job

You could write all of the stuff yourself, but why?

Installation

  1. Install gcc and the python C headers so that you can build psutil.

On Ubuntu, that means running:

$ sudo apt-get install build-essential python-dev python2.7-dev python3.5-dev rabbitmq-server

On OS X, you'll need to run the "XcodeTools" installer.

  1. Get the project source and install it

    $ pip install jobtastic

Creating Your First Task

Let's take a look at an example task using Jobtastic:

from time import sleep

from jobtastic import JobtasticTask

class LotsOfDivisionTask(JobtasticTask):
	"""
	Division is hard. Make Celery do it a bunch.
	"""
	# These are the Task kwargs that matter for caching purposes
	significant_kwargs = [
		('numerators', str),
		('denominators', str),
	]
	# How long should we give a task before assuming it has failed?
	herd_avoidance_timeout = 60  # Shouldn't take more than 60 seconds
	# How long we want to cache results with identical ``significant_kwargs``
	cache_duration = 0  # Cache these results forever. Math is pretty stable.
	# Note: 0 means different things in different cache backends. RTFM for yours.

	def calculate_result(self, numerators, denominators, **kwargs):
		"""
		MATH!!!
		"""
		results = []
		divisions_to_do = len(numerators)
		# Only actually update the progress in the backend every 10 operations
		update_frequency = 10
		for count, divisors in enumerate(zip(numerators, denominators)):
			numerator, denominator = divisors
			results.append(numerator / denominator)
			# Let's let everyone know how we're doing
			self.update_progress(
                count,
                divisions_to_do,
                update_frequency=update_frequency,
            )
			# Let's pretend that we're using the computers that landed us on the moon
			sleep(0.1)

		return results

This task is very trivial, but imagine doing something time-consuming instead of division (or just a ton of division) while a user waited. We wouldn't want a double-clicker to cause this to happen twice concurrently, we wouldn't want to ever redo this work on the same numbers and we would want the user to have at least some idea of how long they'll need to wait. Just by setting those 3 member variables, we've done all of these things.

Basically, creating a Celery task using Jobtastic is a matter of:

  1. Subclassing jobtastic.JobtasticTask
  2. Defining some required member variables
  3. Writing your calculate_result method (instead of the normal Celery run() method)
  4. Sprinkling update_progress() calls in your calculate_result() method to communicate progress

Now, to use this task in your Django view, you'll do something like:

from django.shortcuts import render_to_response

from my_app.tasks import LotsOfDivisionTask

def lets_divide(request):
	"""
	Do a set number of divisions and keep the user up to date on progress.
	"""
	iterations = request.GET.get('iterations', 1000)  # That's a lot. Right?
	step = 10

	# If we can't connect to the backend, let's not just 500. k?
	result = LotsOfDivisionTask.delay_or_fail(
		numerators=range(0, step * iterations * 2, step * 2),
		denominators=range(1, step * iterations, step),
	)

	return render_to_response(
		'my_app/lets_divide.html',
		{'task_id': result.task_id},
	)

The my_app/lets_divide.html template will then use the task_id to query the task result all asynchronous-like and keep the user up to date with what is happening.

For Flask, you might do something like:

from flask import Flask, render_template

from my_app.tasks import LotsOfDivisionTask

app = Flask(__name__)

@app.route("/", methods=['GET'])
def lets_divide():
	iterations = request.args.get('iterations', 1000)
	step = 10

	result = LotsOfDivisionTask.delay_or_fail(
		numerators=range(0, step * iterations * 2, step * 2),
		denominators=range(1, step * iterations, step),
	)

	return render_template('my_app/lets_divide.html', task_id=result.task_id)

Required Member Variables

"But wait, Wes. What the heck do those member variables actually do?" You ask.

Firstly. How the heck did you know my name?

And B, why don't I tell you!?

significant_kwargs

This is key to your caching magic. It's a list of 2-tuples containing the name of a kwarg plus a function to turn that kwarg in to a string. Jobtastic uses these to determine if your task should have an identical result to another task run. In our division example, any task with the same numerators and denominators can be considered identical, so Jobtastic can do smart things.

significant_kwargs = [
	('numerators', str),
	('denominators', str),
]

If we were living in bizzaro world, and only the numerators mattered for division results, we could do something like:

significant_kwargs = [
	('numerators', str),
]

Now tasks called with an identical list of numerators will share a result.

herd_avoidance_timeout

This is the max number of seconds for which Jobtastic will wait for identical task results to be determined. You want this number to be on the very high end of the amount of time you expect to wait (after a task starts) for the result. If this number is hit, it's assumed that something bad happened to the other task run (a worker failed) and we'll start calculating from the start.

Optional Member Variables

These let you tweak the default behavior. Most often, you'll just be setting the cache_duration to enable result caching.

cache_duration

If you want your results cached, set this to a non-negative number of seconds. This is the number of seconds for which identical jobs should try to just re-use the cached result. The default is -1, meaning don't do any caching. Remember, JobtasticTask uses your significant_kwargs to determine what is identical.

cache_prefix

This is an optional string used to represent tasks that should share cache results and thundering herd avoidance. You should almost never set this yourself, and instead should let Jobtastic use the module.class name. If you have two different tasks that should share caching, or you have some very-odd cache key conflict, then you can change this yourself. You probably don't need to.

memleak_threshold

Set this value to monitor your tasks for any runs that increase the memory usage by more than this number of Megabytes (the SI definition). Individual task runs that increase resident memory by more than this threshold get some extra logging in order to help you debug the problem. By default, it logs the following via standard Celery logging:

  • The memory increase
  • The memory starting value
  • The memory ending value
  • The task's kwargs

You then grep for Jobtastic:memleak memleak_detected in your logs to identify offending tasks.

If you'd like to customize this behavior, you can override the warn_of_memory_leak method in your own Task.

Method to Override

Other than tweaking the member variables, you'll probably want to actually, you know, do something in your task.

calculate_result

This is where your magic happens. Do work here and return the result.

You'll almost definitely want to call update_progress periodically in this method so that your users get an idea of for how long they'll be waiting.

Progress feedback helper

This is the guy you'll want to call to provide nice progress feedback and estimation.

update_progress

In your calculate_result, you'll want to periodically make calls like:

self.update_progress(work_done, total_work_to_do)

Jobtastic takes care of handling timers to give estimates, and assumes that progress will be roughly uniform across each work item.

Most of the time, you really don't need ultra-granular progress updates and can afford to only give an update every N items completed. Since every update would potentially hit your CELERY_RESULT_BACKEND, and that might cause a network trip, it's probably a good idea to use the optional update_frequency argument so that Jobtastic doesn't swamp your backend with updated estimates no user will ever see.

In our division example, we're only actually updating the progress every 10 division operations:

# Only actually update the progress in the backend every 10 operations
update_frequency = 10
for count, divisors in enumerate(zip(numerators, denominators)):
	numerator, denominator = divisors
	results.append(numerator / denominator)
	# Let's let everyone know how we're doing
	self.update_progress(count, divisions_to_do, update_frequency=10)

Using your JobtasticTask

Sometimes, your Task Broker just up and dies (I'm looking at you, old versions of RabbitMQ). In production, calling straight up delay() with a dead backend will throw an error that varies based on what backend you're actually using. You probably don't want to just give your user a generic 500 page if your broker is down, and it's not fun to handle that exception every single place you might use Celery. Jobtastic has your back.

Included are delay_or_eager and delay_or_fail methods that handle a dead backend and do something a little more production-friendly.

Note: One very important caveat with JobtasticTask is that all of your arguments must be keyword arguments.

Note: This is a limitation of the current significant_kwargs implementation, and totally fixable if someone wants to submit a pull request.

delay_or_eager

If your broker is behaving itself, this guy acts just like delay(). In the case that your broker is down, though, it just goes ahead and runs the task in the current process and skips sending the task to a worker. You get back a nice shiny EagerResult object, which behaves just like the AsyncResult you were expecting. If you have a task that realistically only takes a few seconds to run, this might be better than giving yours users an error message.

This method uses async_or_eager() under the hood.

delay_or_fail

Like delay_or_eager, this helps you handle a dead broker. Instead of running your task in the current process, this actually generates a task result representing the failure. This means that your client-side code can handle it like any other failed task and do something nice for the user. Maybe send them a fruit basket?

For tasks that might take a while or consume a lot of RAM, you're probably better off using this than delay_or_eager because you don't want to make a resource problem worse.

This method uses async_or_fail() under the hood.

async_or_eager

This is a version of delay_or_eager() that exposes the calling signature of apply_async().

async_or_fail

This is a version of delay_or_fail() that exposes the calling signature of apply_async().

Client Side Handling

That's all well and good on the server side, but the biggest benefit of Jobtastic is useful user-facing feedback. That means handling status checks using AJAX in the browser.

The easiest way to get rolling is to use our sister project, jquery-celery. It contains jQuery plugins that help you:

  • Poll for task status and handle the result
  • Display a progress bar using the info from the PROGRESS state.
  • Display tabular data using DataTables.

If you want to roll your own, the general pattern is to poll a URL (such as the django-celery task_status view ) with your taskid to get JSON status information and then handle the possible states to keep the user informed.

The jquery-celery jQuery plugin might still be useful as reference, even if you're rolling your own. In general, you'll want to handle the following cases:

PENDING

Your task is still waiting for a worker process. It's generally useful to display something like "Waiting for your task to begin".

PROGRESS

Your task has started and you've got a JSON object like:

{
	"progress_percent": 0,
	"time_remaining": 300
}

progress_percent is a number between 0 and 100. It's a good idea to give a different message if the percent is 0, because the time remaining estimate might not yet be well-calibrated.

time_remaining is the number of seconds estimated to be left. If there's no good estimate available, this value will be -1.

SUCCESS

You've got your data. It's time to display the result.

FAILURE

Something went wrong and the worker reported a failure. This is a good time to either display a useful error message (if the user can be expected to correct the problem), or to ask the user to retry their task.

Non-200 Request

There are occasions where requesting the task status itself might error out. This isn't a reflection on the worker itself, as it could be caused by any number of application errors. In general, you probably want to try again if this happens, but if it persists, you'll want to give your user feedback.

Running The Test Suite

We use tox to run our tests against various combinations of python/Django/Celery. We only officially support the combinations listed in our .travis.yml file, but we're working on (Issue 33) supporting everything defined in tox.ini. Until then, you can run tests against supported combos with:

$ pip install tox
$ tox -e py27-django1.8.X-djangocelery3.1.X-celery3.1.X

Our test suite currently only tests usage with Django, which is definitely a bug. Especially if you use Jobtastic with Flask, we would love a pull request.

Dynamic Time Estimates via JobtasticMixins

Have tasks whose duration is difficult to estimate or that doesn't have smooth progress? JobtasticMixins to the rescue!

JobtasticMixins provides an AVGTimeRedis mixin that stores duration date in a Redis backend. It then automatically uses this stored historical data to calculate an estimate. For more details, check out JobtasticMixins on github.

Is it Awesome?

Yes. Increasingly so.

Project Status

Jobtastic is currently known to work with Django 1.6+ and Celery 3.1.X The goal is to support those versions and newer. Please file issues if there are problems with newer versions of Django/Celery.

Gotchas

At this time of this writing, the latest supported version of kombu with celery 4.x is 4.0.2. This is due to an issue with invalid or temporarily broken brokers with the newer versions of kombu.

Also, RabbitMQ should be running in the background while running tests.

A note on usage with Flask

Previously, if you were using Flask instead of Django, then the only currently-supported way to work with Jobtastic was with Memcached as your CELERY_RESULT_BACKEND.

Thanks to @rhunwicks this is no longer the case!

A cache is now selected with the following priority:

  • If the Celery appconfig has a JOBTASTIC_CACHE setting and it is a valid cache, use it
  • If Django is installed, then:
    • If the setting is a valid Django cache entry, then use that.
    • If the setting is empty use the default cache
  • If Werkzeug is installed, then:
    • If the setting is a valid Celery Memcache or Redis Backend, then use that.
    • If the setting is empty and the default Celery Result Backend is Memcache or Redis, then use that

Non-affiliation

This project isn't affiliated with the awesome folks at the Celery Project (unless having a huge crush counts as affiliation). It's a library that the folks at PolicyStat have been using internally and decided to open source in the hopes it is useful to others.

jobtastic's People

Contributors

abbasovalex avatar caffodian avatar coagulant avatar deti avatar georgemarshall avatar jacobwegner avatar jlward avatar kylegibson avatar luzfcb avatar midnightlynx avatar olevinsky avatar rhunwicks avatar silviot avatar singingwolfboy avatar winhamwr avatar yazgoo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jobtastic's Issues

compatibility with Celery 4.0?

Hi,

with celery 4.0 around the corner, and me needing to implement a quite complex task based processing infrastructure which would benefit greatly from jobtastic's additional features, I wonder: is jobtastic compatible with celery 4.0? Or, if not, is it planned?

Sorry if this is not the right place to ask, could not find a specific jobtastic mail list or so.

If a result is cached, skip sending the Task to the broker

If all we're going to do is return a BaseAsyncResult matching the task_id that already completed this identical task, there's no reason to route it to the broker and wait for a worker to pick it up. Not only does this add some needless overhead, but in the case that all of the workers are busy, the user will be waiting for basically no reason.

Move the PolicyStat internal test suite to this repo and get it up on travis-ci

There are tests, I promise. Convert them over to this repo (using the division example) for Django. It would probably good to get at least a couple of Flask-only tests as well, to ensure that Django continues to not be required.

Get these running on Travis-CI because travis is awesome.

It would be nice to get tests running against a range of python, django and celery versions as well, but that's probably a separate ticket.

Install Fails: clang: error: unknown argument: '-mno-fused-madd' [-Wunused-command-line-argument-hard-error-in-future]

โœ— pip install jobtastic
Downloading/unpacking jobtastic
  Downloading jobtastic-0.2.2.tar.gz
  Running setup.py egg_info for package jobtastic

    no previously-included directories found matching 'jobtastic/*.pyc'
    no previously-included directories found matching 'test_projects/*.pyc'
Requirement already satisfied (use --upgrade to upgrade): celery>=2.5 in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from jobtastic)
Downloading/unpacking psutil (from jobtastic)
  Downloading psutil-2.0.0.tar.gz (207kB): 207kB downloaded
  Running setup.py egg_info for package psutil

    warning: no previously-included files matching '*' found under directory 'docs/_build'
Requirement already satisfied (use --upgrade to upgrade): pytz>dev in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from celery>=2.5->jobtastic)
Requirement already satisfied (use --upgrade to upgrade): billiard>=3.3.0.14,<3.4 in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from celery>=2.5->jobtastic)
Requirement already satisfied (use --upgrade to upgrade): kombu>=3.0.13,<4.0 in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from celery>=2.5->jobtastic)
Requirement already satisfied (use --upgrade to upgrade): anyjson>=0.3.3 in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from kombu>=3.0.13,<4.0->celery>=2.5->jobtastic)
Requirement already satisfied (use --upgrade to upgrade): amqp>=1.4.4,<2.0 in /Users/rbalfanz/.virtualenvs/proj/lib/python2.7/site-packages (from kombu>=3.0.13,<4.0->celery>=2.5->jobtastic)
Installing collected packages: jobtastic, psutil
  Running setup.py install for jobtastic

    no previously-included directories found matching 'jobtastic/*.pyc'
    no previously-included directories found matching 'test_projects/*.pyc'
  Running setup.py install for psutil
    building '_psutil_osx' extension
    cc -fno-strict-aliasing -fno-common -dynamic -arch x86_64 -arch i386 -g -Os -pipe -fno-common -fno-strict-aliasing -fwrapv -mno-fused-madd -DENABLE_DTRACE -DMACOSX -DNDEBUG -Wall -Wstrict-prototypes -Wshorten-64-to-32 -DNDEBUG -g -fwrapv -Os -Wall -Wstrict-prototypes -DENABLE_DTRACE -arch x86_64 -arch i386 -pipe -I/System/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c psutil/_psutil_osx.c -o build/temp.macosx-10.9-intel-2.7/psutil/_psutil_osx.o
    clang: error: unknown argument: '-mno-fused-madd' [-Wunused-command-line-argument-hard-error-in-future]
    clang: note: this will be a hard error (cannot be downgraded to a warning) in the future
    error: command 'cc' failed with exit status 1
    Complete output from command /Users/rbalfanz/.virtualenvs/proj/bin/python -c "import setuptools;__file__='/Users/rbalfanz/.virtualenvs/proj/build/psutil/setup.py';exec(compile(open(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))" install --record /var/folders/r8/cpg4rbvj7s1d1477gpfx75rs11kswl/T/pip-ou2Aso-record/install-record.txt --single-version-externally-managed --install-headers /Users/rbalfanz/.virtualenvs/proj/include/site/python2.7:
    running install

running build

running build_py

creating build

creating build/lib.macosx-10.9-intel-2.7

creating build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/__init__.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_common.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_compat.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_psbsd.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_pslinux.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_psosx.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_psposix.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_pssunos.py -> build/lib.macosx-10.9-intel-2.7/psutil

copying psutil/_pswindows.py -> build/lib.macosx-10.9-intel-2.7/psutil

running build_ext

building '_psutil_osx' extension

creating build/temp.macosx-10.9-intel-2.7

creating build/temp.macosx-10.9-intel-2.7/psutil

creating build/temp.macosx-10.9-intel-2.7/psutil/arch

creating build/temp.macosx-10.9-intel-2.7/psutil/arch/osx

cc -fno-strict-aliasing -fno-common -dynamic -arch x86_64 -arch i386 -g -Os -pipe -fno-common -fno-strict-aliasing -fwrapv -mno-fused-madd -DENABLE_DTRACE -DMACOSX -DNDEBUG -Wall -Wstrict-prototypes -Wshorten-64-to-32 -DNDEBUG -g -fwrapv -Os -Wall -Wstrict-prototypes -DENABLE_DTRACE -arch x86_64 -arch i386 -pipe -I/System/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c psutil/_psutil_osx.c -o build/temp.macosx-10.9-intel-2.7/psutil/_psutil_osx.o

clang: error: unknown argument: '-mno-fused-madd' [-Wunused-command-line-argument-hard-error-in-future]

clang: note: this will be a hard error (cannot be downgraded to a warning) in the future

error: command 'cc' failed with exit status 1

----------------------------------------
Cleaning up...
Command /Users/rbalfanz/.virtualenvs/proj/bin/python -c "import setuptools;__file__='/Users/rbalfanz/.virtualenvs/proj/build/psutil/setup.py';exec(compile(open(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))" install --record /var/folders/r8/cpg4rbvj7s1d1477gpfx75rs11kswl/T/pip-ou2Aso-record/install-record.txt --single-version-externally-managed --install-headers /Users/rbalfanz/.virtualenvs/proj/include/site/python2.7 failed with error code 1 in /Users/rbalfanz/.virtualenvs/proj/build/psutil
Storing complete log in /Users/rbalfanz/.pip/pip.log

Herd Avoidance and Task Retry

We are using Herd Avoidance to prevent users from executing simultaneous executions of the same task. However, if there's an exception in the task and we retry from within the task, the retried task ends up hitting the herd avoidance as well. Is this working as designed? Shouldn't retried tasks be able to avoid the herd avoidance?

groups or subtask

Is it possible to use jobtastic for celery groups/subtasks? I would like to have a single jobID/progress calculator that runs a number of different queries that may be different each time but to cache these individual queries if they get called again sort of like this:

result = bulk_lookup.delay_or_fail( queries=list_of_queries )

Improve documentation for process of converting existing tasks to jobtastic

first, thank you for you brilliant work!

I just found your project, it really should've been a lifesaver, BUT now that I've implemented many many "ordinary" complex celery task classes, is there some convenient way for me to migrate to jobtastic easily?

Also, does it integrate with celery 3.0 well?

License mismatch

In setup.py you say that the software is licensed under the BSD license, but your license file is actually the MIT one.

Is there a correct one?

Thank you!

Move to Flask/Celery-style explicit jobtastic invocation

Instead of implicitly trying to detect the Django or Flask environment, take a page from Flask/Celery and use an explicitly-instantiated Jobtastic object, which takes configuration. Also use an optional environment variable (djcelery-style) to keep Django working.

Tests don't actually cover Celery 3.0.x

If you examine the travis test outputs, testing currently doesn't actually test using Celery 3.0.

django-celery>=3.0,<3.0.21 didn't have an upper cap on their celery dependency, so on a fresh dependency install it installs celery 3.1.

django-celery>=3.0.21,<3.1 has an upper cap of celery<3.1, so it uses celery 3.0.

When using celery 3.0, the broker fallback tests and progress tests start failing.
It turns out when you use with self.settings(CELERY_ALWAYS_EAGER=True): in those tests, that properly changes the app setting for the way celery 3.1 builds the app, but doesn't change the setting for celery 3.0. So the tasks all turn up PENDING instead of SUCCESS.

Either testing should be fixed to cover that celery version, or the decision should be made to stop supporting celery<3.1. Doing the latter might be preferable, since celery supports django out of the box now, without needing django-celery.

Add example template lets_divide.html to README

I'm not very familiar with jQuery plugins, and I'm trying to use jquery-celery with jobtastic for the first time.

It'd be great if README.md had an example template that could help folks like me to get started.

I'd be happy to do a PR for this, but I don't know how to create that example template ;-)

I'm guessing something like:

<html>
    <head>
        <script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js" ></script>
        <script type="text/javascript" src="https://raw.github.com/PolicyStat/jquery-celery/master/src/celery.js"></script>
    </head>
    <body>
        <div id="task_id" style="display: none;">
            {{ task_id }}
        </div>
    </body>
</html>

Add an optional `significant_version` to the cache busting

If a task's output format changes, the previous result might still be cached, even though the result should now be different even for the same significant_kwargs.

If at a task has a significant_version property, include that in the cache key. That will allow developers to bump that number any time they need to invalidate old results because of a change in the task itself.

Magic value (-1) for cache TTL is not compatible with cachelib RedisCache.

Hi all!

This is maybe more a feature suggestion than an actual issue, please bear with me and I'll explain my case.

Since werkzeug cache is now deprecated and refactored to a separate library (cachelib) I have started using the RedisCache from that library instead of configuring it through werkzeug internals the way that jobtastic is currently expecting.

So far so good; I figured out the WrappedCache class and how to use that together with cachelib RedisCache and at first glance all is working.

Now here's my issue: I want to have cached results that never expire in Redis.
To achieve that with cachelib RedisCache one would send a TTL of -1 but in jobtastic however a TTL of -1 means "don't cache anything at all, ever" and hence nothing happens.

https://github.com/PolicyStat/jobtastic/blob/master/jobtastic/task.py#L353
https://github.com/pallets/cachelib/blob/master/cachelib/redis.py#L95

I get that we can't just change this magic value in jobtastic to mean the complete opposite of what it means today for backwards compatibility reasons but I guess the same could also be said about cachelib...

One solution I could imagine happening in jobtastic would be moving the "what does a TTL of -1 mean?"-logic out from the middle of run() and instead deal with it in it's own function that we somehow could override in base classes and change.
Maybe just like a "determine_ttl()" or perhaps a larger "dispatch_set()" function to optionally override with slightly more responsibilities.

Preferably for future compatibility purposes I also don't want to override the entire run()-function in my project to achieve this fairly tiny change in logic.

Another option I guess would be to have a config value of sorts in jobtastic to determine if -1 should mean never or always?

Do you have any thoughts on this?

More task feedback

We have a very long-running task, doing many small-ish operations (upserts) in serial.
It'd be handy to know e.g. "last updated record x", "inserted record y"
Does anybody have any advice on the best way to go about that?

Handle REVOKED tasks

Currently, if a JobTastic task is REVOKED by Celery then the herd avoidance will prevent a new task being submitted until the herd avoidance timeout is up.

This is because the async_apply just checks for the existence of cache.get('herd:%s' % cache_key) and doesn't check the status.

I think it should check for revoked tasks and remove the cache key for them so that a new task submission occurs:

from celery.states import PENDING, SUCCESS, REVOKED
        # Check for an in-progress equivalent task to avoid duplicating work
        task_id = cache.get('herd:%s' % cache_key)
        if task_id:
            task = self.AsyncResult(task_id)
            if task.state == REVOKED:
                logging.info('Removing existing revoked task: %s', task_id)
                cache.delete('herd:%s' % self.cache_key)
            else:
                logging.info('Found existing in-progress task: %s', task_id)
                return task

If you want me to do a pull request, can I also refactor _break_thundering_herd_cache so that it takes the cache key as a parameter, then I could call it from async_apply to handle the cache.delete - alternatively, I could store the self.cache_key from apply_async the same way it is in run

Protect users against passing Django QuerySets as kwargs to tasks

If you pass a Django QuerySet or a Django Model as an argument to a task, very bad things happen. They get pickled as a huge object that takes a lot of Memory, which balloons the amount of RAM taken by the broker, balloons the amount of RAM used by the worker and also manages to make running the task very slow.

If django exists in the jobtastic environment, assert that all of the kwargs aren't instances of a QuerySet or Model.

Also add a setting to disable this behavior, in case someone wants to optimize away the small amount of overhead this might entail.

Allow passing kwargs/args to apply_async

The celery docs show that one can specify the queue a task uses with apply_async:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

It would be nice to expose this in the delay_or_fail method. My use case is a view that exposes various django model methods as web hooks. The tasks need to be routed to different queues based on how long they are expected to run. This needs to use the queues kwarg of apply_async but isn't exposed via delay_or_fail().

It would be handy if there was a way to specify the args/kwargs delay_or_fail passed to apply_async

Jobtastic tasks waits even CELERY_TASK_ALWAYS_EAGER is True

Hi everyone,

I have a JobtasticTask like this:

result = SimulationTask.delay_or_fail(simulation_id=instance.id)

When I call this method in my tests my tests time out because they are stuck here.

That's why we have CELERY_TASK_ALWAYS_EAGER which works perfectly for regular celery tasks.

Apparantly there is some issue with JobtasticTasks...

I'm using:
django==2.0.5
celery==4.2.0
jobtastic==2.0.0
django_celery_results==1.0.1
django_celery_beat==1.1.1

Project configuration

Hi, I was investigating how to get jobtastic working, and I managed to create a sample projects with two types of tasks. I could start any number of tasks concurrently. I added the EXACT SAME files to my product code base in a new package. Whenever I try to run a task, I get an exception (_AttributeError: 'DisabledBackend' object has no attribute 'get_task_meta_for'). Here is the traceback:

Traceback (most recent call last): File "/home/kyousef/server_sdk/SDK/inmobly_sdk/bgqueue_tasks/test.py", line 2, in <module> odd_task = GenerateOdd.delay_or_fail(size=20) File "/usr/local/lib/python2.7/dist-packages/jobtastic/task.py", line 183, in delay_or_fail return self.apply_async(args=args, kwargs=kwargs) File "/usr/local/lib/python2.7/dist-packages/jobtastic/task.py", line 256, in apply_async logging.info('Current status: %s', task_meta.status) File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 394, in state return self._get_task_meta()['status'] File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 339, in _get_task_meta return self._maybe_set_cache(self.backend.get_task_meta(self.id)) File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 307, in get_task_meta meta = self._get_task_meta_for(task_id) AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

Help will be much appreciated.

Add support for Celery 3.1

AppCase was moved from celery.tests.utils to celery.tests.case.

eager_tasks is also nowhere to be found.

Since django is supported out of the box in 3.1 it's a bit tricky to handle I think. Maybe with another test project ?

I also tried running tests on Celery 3.0 but the CELERY_CACHE_BACKEND = 'memory' is unrecognized so I used CELERY_CACHE_BACKEND = 'locmem://' instead.

Then I encontered one failed test: test_sanity (jobtastic.tests.test_broker_fallbacks.BrokenBrokerTestCase).

Maybe my environement isn't properly set up though...

Add tests for usage with Flask

The test suite currently only runs with Django in existence, which is bad because we won't detect breakages for Flask or other non-Django Celery uses.

Django-Celery's setup.py is probably a good inspiration for how to fix this.

  1. Add a custom command (test_flask?) that runs some tests based on Flask project.
  2. If we need to create a tiny Flask project to get the tests running, that can live at test_projects/flask or some such.
  3. Then we can use a tox environment with a requirements/flask_test.txt file to ensure things are kosher without Django even being installed.

Race condition in `apply_async` can break `thundering_herd` avoidance, queuing duplicate significant-matching tasks

apply_async has a race condition. If two equivalent tasks (as defined by significant_kwargs) are in apply_async at the same time, the behavior is different versus one of those calls being started directly after the first one finishes queuing a task.

Reproduction

  • Task1 enters apply_async, determines there's no already-computed or in-progress task.
  • Task2 enters apply_async, determines there's no already-computed or in-progress task.
  • Task1 acquires a cache lock and places that task on the queue
  • Task2 can't immediately acquire a cache lock.
    • In redis, the code blocks within redis until Task1 gives up its lock
    • In memcached, the code loops in python code until Task1 gives up its lock

Current, broken, behavior

  • Task2 acquires a cache lock and places the now-duplicated task on the queue

Desired behavior

  • Task2 acquires a cache lock, doesn't place a new task on the queue, and instead piggy-backs on Task1

Solution

Once we obtain the cache lock:

  • We need to once again check for:
    • A completed task
    • An in-progress herd-avoided task
    • A completed task (just in case the other running task went from herd-avoided to completed between the first 2 checks
  • If we find any of those things, we should piggyback on the other task instead of going through with queuing another one

Add ability to ensure only 1 version of a task is running at a time, but still actually run subsequent jobs: Simultaneous Execution Prevention

Sometimes, we might want multiple very-similar tasks to run (we can't just drop them or use the result from the first task), but not at the same time. Jobtastic can't currently help with this type of synchronization.

Support types of jobs where we want to prevent them from running at the same time, but we still want to run the other jobs, later.

Simultaneous Execution Prevention

  • Add a simultaneous_execution_prevention_timeout option that defaults to 0 (off)
  • When a job with that setting on hits a worker, it tries to acquires a cache lock
  • If it gets the lock, it goes on its merry way, making sure to release the lock when it's done or if it crashes
  • If it doesn't get the lock, it immediately retries the task with a short delay. Can we figure out how to separate a "I'm waiting on simultaneous lock" retry from a "the actual task needs a retry" so that a user's max_retry settings are actually used? We want to keep retrying indefinitely if something else has the simultaneous execution lock, since we can rely on that cache timeout to keep a global timeout for all potential executions of this type of task.
  • If herd_avoidance is >0 (active) or cache_duration is >=0 (active), we should raise an exception if someone tries to also set simultaneous_execution_prevention_timeout to >0 (active). They won't play nice together and it was almost certainly someone misunderstanding the docs.

Caveats to users based on countdown/eta/delay

This kind of thing can get you into a deadlock state with your queues. Because of the way worker prefetch_count, retry, and delay/eta/countdown interact, your retry call with a delay could block an entire pool of workers.

Let's say you have one worker pool with a concurrency of 3 and a prefetch_multiplier of 4. Then you queue up 13 jobs with simultaneous execution prevention turned on that all match via significant_kwargs. The first one to hit a worker will start running, and then the next 12 will get retried with a delay. Those will then immediately hang out in your worker pool, and since the pool only has 12 "slots" for tasks (3 concurrency times 4 prefetch_multiplier), and since the delay/eta/countdown happens at the worker pool level, the other 2 workers in your pool will have nothing to do. Even though you might be queuing up other jobs that could be run by those 2 workers, they can't get to them, because the pool has already pulled its max amount of jobs.

Could we mitigate this?

  • Maybe delay should be really fast, since retry does actually send things back to the broker? We'll potentially be churning through a lot of jobs that will just immediately retry after failure to acquire the lock, but that will at least let other jobs slip in between.

Uncatched exception "Connection refused"

Hello,

First of all thanks for the great work, JobTastic is really neat and handy.

I tried running delay_or_eager and delay_or_fail with a stopped broker (rabbitMQ) but it seems that the resulting error isn't catched.

What I get is a 500 "[Errno 111] Connection refused".

I'm working on a very simple test case with the bare minimum. My setup is Celery (3.0) along with JobTastic (0.2.2) and amqp (1.0.13).

I figure this shouldn't be a hard fix so let me know if you are aware of this issue or if I'm doing something wrong here. I'll be glad to contribute.

Hijack `self.backend` instead of trying to find a cache

The only requirement for a cache backend that will work with Jobtastic is that it has an atomic increment operation. Redis (and probably others) have this, so there's no reason we should limit the options to Memcached only. Ideally, #8 will be completed first so that we don't have to do a bunch of implicit configuration parsing, but at worst, we can look for Redis the same way we look for memcached.

Maybe there's a way to make this a bit more flexible though. It seems that all cache backends for both Django and Werkzeug must provide an incr method, regardless of whether it's atomic. There seem to be two options then:

  • Make it a documentation thing and call out the fact that non-atomic incr backends will not always protect you from thundering herd issues. We can let people use non-atomic incr backends this way, and they'll probably still improve things a majority of the time.
  • Use some kind of backend whitelist (which is the way things are currently implemented) that knows that Redis and Memcached are atomic, while the others aren't.

Of these two options, it seems like providing the freedom to use non-atomic backends is preferable.

Add option to skip herd avoidance check, but still trigger avoidance for other tasks

I have a situation where I would like to use thundering herd avoidance conditionally.

It would be easy to support this with a conditional check at https://github.com/PolicyStat/jobtastic/blob/master/jobtastic/task.py#L242 and pop the key out of the options that get passed to the superclass.

Something like the following should do the trick:

        # Check for an in-progress equivalent task to avoid duplicating work
        if not options.pop('disable_thundering_herd_avoidance', False):
            task_id = cache.get('herd:%s' % cache_key)
            if task_id:
                logging.info('Found existing in-progress task: %s', task_id)
                return self.AsyncResult(task_id)

This would work nicely with #56

Stacktrace when using psutil 3.0.1

get_memory_info is just memory_info in psutil 3.0.1:

http://pythonhosted.org/psutil/index.html#psutil.Process.memory_info

  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/jobtastic/task.py", line 183, in delay_or_fail
    return self.apply_async(args=args, kwargs=kwargs)
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/jobtastic/task.py", line 254, in apply_async
    **options
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/celery/app/task/__init__.py", line 445, in apply_async
    return self.apply(args, kwargs, task_id=task_id, **options)
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/celery/app/task/__init__.py", line 601, in apply
    request=request, propagate=throw)
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/celery/execute/trace.py", line 248, in eager_trace_task
    uuid, args, kwargs, request)
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/celery/execute/trace.py", line 181, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/jobtastic/task.py", line 358, in run
    begining_memory_usage = self._get_memory_usage()
  File "/home/ubuntu/virtualenvs/pstat_ticket_unittest_1/lib/python2.6/site-packages/jobtastic/task.py", line 448, in _get_memory_usage
    usage = current_process.get_memory_info()
AttributeError: 'Process' object has no attribute 'get_memory_info'

Add a helper for notifying (emailing?) the result of a task to a user on completion

For really long-running tasks, even a progress bar isn't all that user friendly. It still requires them to hang around with a browser window open. A really nice feature would be the ability to let users opt-in to some kind of notification when their task is done.

Jobtastic can do this with a keyword argument on the task combined with a JobtasticNotifierTask. The idea would be to add something like

allow_user_notification = True

on the task. Then, with a supplied Django View or Flask request handler, the user can opt in to being notified about the task from the page where they're waiting for results. When the original JobtasticTask finishes, it checks a known cache location (based off of cache_prefix) to see if any users have opted in to notifications. If they have, it fires off a JobtasticNotifierTask right before it was finished.

This task then checks the same cache location and uses the configured JOBTASTIC_NOTIFICATION_BACKENDS to notify the given user.

We can probably start with these backends for good coverage of common use-cases:

  • Flask-Mail backend that just accepts an email address
  • Django user email notification backend that takes a django.contrib.auth.models.User id and fires off an email to their user.email using Django's send_mail

Jobtastic Task uses the root logger

I thought that the best practice was to get the logger at the top of the module:

import logging
logger = logging.getLogger(__name__)

class JobtasticTask(Task):

    def apply_async(self, args, kwargs, **options):
        if task_id:
            logger.info('Found existing cached and completed task: %s', task_id)

Currently, JobtasticTask just uses the root logger in a few places (logging.info('Found existing cached and completed task: %s', task_id)) which makes it hard to create special logging configurations for Tasks.

Is there a reason to use the root logger, or can I create a PR to change them?

Add field with time of last progress updating to task result dictionary

I'm thinking It might be useful to understand what is last time of response from worker.
For example:

self.update_state(None, PROGRESS, {
            "progress_percent": progress_percent,
            "time_remaining": time_remaining,
            "time_feedback": datetime.now(),
        })

If workers accidentaly restarts, as result you will have tasks started with some already defined porgress. But it's hard to understand If any workers are processing them now or they should be assumed as died. I suppose last feedback time may help to understand if task is alive.

What do you think about this idea?

Jobtastic with Flask

in my project i don't have django, I just have backend implemented using Flask.
when i am trying to try the example with flask, I've runtime error due to lack of django settings.

`
from jobtastic import JobtasticTask
File "/usr/local/lib/python2.7/dist-packages/jobtastic/init.py", line 17, in
from jobtastic.task import JobtasticTask # NOQA
File "/usr/local/lib/python2.7/dist-packages/jobtastic/task.py", line 35, in
from django.core.cache import cache
File "/usr/lib/python2.7/dist-packages/django/core/cache/init.py", line 69, in
if DEFAULT_CACHE_ALIAS not in settings.CACHES:
File "/usr/lib/python2.7/dist-packages/django/conf/init.py", line 54, in getattr
self._setup(name)
File "/usr/lib/python2.7/dist-packages/django/conf/init.py", line 47, in _setup
% (desc, ENVIRONMENT_VARIABLE))
django.core.exceptions.ImproperlyConfigured: Requested setting CACHES, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.

Process finished with exit code 1
`

will i be able to use the jobtastic ?

old requirements of psutil clashes

in the requirements there is an old version of psutil pinned to below version 4

trouble is some new packages are using a later version, in my case I'm trying to use google-cloud and it requires: psutil<6.0dev,>=5.2.2

How do you handle POST -> REDIRECT -> GET sequence to pass task_id?

It is generally considered good practise to use POST for data changing requests, which in my case is done in the task. Also I always try to REDIRECT after a POST to a GET, so that if the user clicks refresh, the same task wouldn't be started.

How do you handle this, in combination with passing around the task_id?

Do you also put the task_id in the session?

Distinguish between PROGRESS and PENDING for herd avoidance

I want to use Jobtastic to manage the refresh of a PostgreSQL materialized view when the user updates the underlying tables. The user performs many updates in succession and refreshing the view takes about 5 minutes. Therefore, I want to:

  • submit the task with countdown=120 so that I wait for 2 minutes after the last update to see if there are any more updates before executing the task
  • if a new task is submitted during that countdown, then I reset the countdown (or replace the old task submission with the new one)
  • once the countdown is reached and the task execution starts, then the next update should put a new PENDING entry on the queue, that doesn't get executed until the current PROGRESS one is finished

Does that use case fit with the intention of Jobtastic?

I can see that at the moment it doesn't distinguish between PROGRESS and PENDING for herd avoidance and I would need that - so that the herd avoidance is on PENDING only.

Part of that change would also be to reset the countdown time on the task when exiting through the herd avoidance path.

My understanding of the acquire_lock code at the moment is that it prevents duplicate Tasks being submitted - I think we might need a separate lock along the lines of http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time to make sure that the PENDING task isn't executed until the PROGRESS one finishes.

I'm happy to work on PR for any required changes if you point me in the right direction.

Decorator all the things!

Instead of requiring subclassing of JobtasticTask, provide a jobtastictask decorator that uses the decorated method as calculate_result. This wouldn't allow things like overriding progress calculation, but for a big majority of tasks, you probably don't need that anyway. The configuration member variables can then be set as keyword arguments on the decorator.

HT: zphds

Add Beaker caching support for usage with Pyramid

Does Jobtastic currently work in Pyramid? The documentation says it works in Django and Flask, which gives me the assumption it is fairly framework-agnostic (just like Celery). If there are known problems with it working in Pyramid that would be nice to know. Otherwise, if it does work, I'll go ahead and implement it, and write up an example.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.