Comments (6)
Hello!
If I was solving this problem, I think I would do something like this:
- Whenever a change is made, I would set a known cache key with the current timestamp and then I would call a
viewRecalcWaitTask
(or some such) viewRecalcWaitTask
gets called with a 120 second countdown (as you suggested) and all it does when it runs is check that known cache key. If the timestamp is more than 120 seconds old, it just calls theviewRecalcTask
that does the recalculation and then immediately exits. If the timestamp is less than 120 seconds old, it just exists without calling the recalculation task.- In the
viewRecalcTask
, record a timestamp as soon as it starts running. Then check the known cache value at the very end of the run. If the cache timestamp is later than the task's starting timestamp, then it knows that it needs to run again because an update happened while it was running. Then you can just start anotherviewRecalcWaitTask
with a 120 second countdown without updating the known cache value.
I think that will result in the behavior you're looking for. Thoughts?
-Wes
from jobtastic.
And if you could think of a good way to include this kind of pattern in Jobtastic with an understandable API, that would be super-cool. Maybe a generic groupedWaitTask
and a helper method to get/set the cache with the timestamp?
from jobtastic.
This is my first cut...
apply_async
stores the time the Task should wait until in the cache and then callssuper
.run
checks if the wait time has been reached:- if it hasn't then it queues a new Task to run at the wait time and returns
None
so the Task is treated as SUCCESSFUL by Celery - if the wait time has been reached then it runs the task as normal
- after running the task it sees if the task has been resubmitted while the job was running and if so it submits a new Task for the end of the current wait time.
- if it hasn't then it queues a new Task to run at the wait time and returns
Does this look sensible to you?
class GroupedWaitTask(JobtasticTask):
"""
An extension of ``JobtasticTask`` that waits a specified number of seconds
since the last time it was submitted before it runs.
This supports tasks that must be run when a set of updates has been complete
but you don't know when that will be.
One example is refreshing a summary table (perhaps implemented as a
materialized view): the refresh must happen in a timely manner after the
user updates the underlying records, but will be out of date as soon as
another update is made. Therefore, we want to wait for a short time after
each update is made to see if there are any more updates, and then run the
refresh task after the "wait period" expires.
The following class members are required:
* ``wait_seconds`` The number of seconds since the last time this task was
submitted that must elapse before the task can run.
"""
abstract = True
ignore_result = True
@classmethod
def apply_async(self, args, kwargs, **options):
"""
Store the timestamp of the most recent submission for this task
"""
self._validate_required_class_vars()
cache_key = self._get_cache_key(**kwargs)
# Store expiry time for the wait period
if 'eta' in options:
wait = options['eta']
elif 'countdown' in options:
wait = datetime.now(pytz.utc) + timedelta(seconds=options['countdown'])
else:
wait = datetime.now(pytz.utc) + timedelta(seconds=self.wait_seconds)
logging.info("Setting %s to wait until %s", self.name, wait)
cache.set('wait:%s' % cache_key, wait)
return super(GroupedWaitTask, self).apply_async(args, kwargs, **options)
def run(self, *args, **kwargs):
cache_key = self._get_cache_key(**kwargs)
if get_task_logger:
self.logger = get_task_logger(self.__class__.__name__)
else:
# Celery 2.X fallback
self.logger = self.get_logger(**kwargs)
# If we haven't reached the end of the waiting period then schedule a
# new task for then and exit
wait = cache.get('wait:%s' % cache_key)
if wait >= datetime.now(pytz.utc) and not self.request.is_eager:
self.logger.info("Deferring %s until %s", self.__class__.__name__, wait)
# Remove the existing herd protection because we want to submit a new task
cache.delete('herd:%s' % cache_key)
self.apply_async(args, kwargs, eta=wait)
return None
# We have reached the end of the wait period, so calculate the result
result = super(GroupedWaitTask, self).run(*args, **kwargs)
# If a task was submitted after we started calculating the result, then
# submit a new job now
new_wait = cache.get('wait:%s' % cache_key)
if new_wait > wait and not self.request.is_eager:
self.logger.info("Resubmitting %s for %s", self.__class__.__name__, new_wait)
self.apply_async(args, kwargs, eta=new_wait)
return result
@classmethod
def _validate_required_class_vars(self):
"""
Ensure that this subclass has defined all of the required class
variables.
"""
required_members = (
'wait_seconds',
)
for required_member in required_members:
if not hasattr(self, required_member):
raise Exception(
"GroupedWaitTask's must define a %s" % required_member)
super(GroupedWaitTask, self)._validate_required_class_vars()
from jobtastic.
This looks great! It's probably worth some tests with mocking to ensure we're hitting the right paths, but from reading through, I think this will do what we want. It's also better than my proposed API. This being effectively a wrapper around the task that actually does the work is quite slick.
The only think is missing here (besides being a pull request) is documentation for the README plus (ideally) at least some kind of tests for regression purposes. Supporting multiple versions of Celery without those is pretty tough.
This is very cool!
Thanks
-Wes
from jobtastic.
I'm happy to turn it into a PR and do docs. I'd like to do tests too, but I'm not sure where to start. I tried writing tests using my normal (django-based) approach, but it doesn't work in the absence of a working Celery server, and my attempts to get one to run under the control of the Django test runner have been unsuccessful so far. If you could provide an example, I'll add other test cases to it.
from jobtastic.
I can completely relate to coming from a Django testing background.
this test might shed some light. In general, I find mock
to be very necessary when writing Celery tests in general, but especially when testing something like Jobtastic.
-Wes
from jobtastic.
Related Issues (20)
- Add option to skip herd avoidance check, but still trigger avoidance for other tasks HOT 19
- compatibility with Celery 4.0? HOT 3
- Project configuration HOT 4
- celery.conf deprecated HOT 2
- License mismatch HOT 1
- Python core dump or memcached/redis I/O error can cause other-process `apply_async` to loop for `thundering_herd_timeout` HOT 4
- Add option to retry a task later if the task is already running HOT 1
- Add ability to ensure only 1 version of a task is running at a time, but still actually run subsequent jobs: Simultaneous Execution Prevention HOT 3
- Add support for Celery >= 4.0 HOT 11
- How do you handle POST -> REDIRECT -> GET sequence to pass task_id? HOT 2
- groups or subtask HOT 1
- old requirements of psutil clashes
- Herd Avoidance and Task Retry HOT 1
- Jobtastic tasks waits even CELERY_TASK_ALWAYS_EAGER is True
- Magic value (-1) for cache TTL is not compatible with cachelib RedisCache.
- Expose method to extend lock timeout HOT 1
- jquery-celery is dead ...
- Caching task even though cache_duration = -1 HOT 7
- cache locking allows bypassing herd avoidance
- Support for celery 5.0
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from jobtastic.