Giter Site home page Giter Site logo

Comments (6)

winhamwr avatar winhamwr commented on July 17, 2024

Hello!

If I was solving this problem, I think I would do something like this:

  • Whenever a change is made, I would set a known cache key with the current timestamp and then I would call a viewRecalcWaitTask (or some such)
  • viewRecalcWaitTask gets called with a 120 second countdown (as you suggested) and all it does when it runs is check that known cache key. If the timestamp is more than 120 seconds old, it just calls the viewRecalcTask that does the recalculation and then immediately exits. If the timestamp is less than 120 seconds old, it just exists without calling the recalculation task.
  • In the viewRecalcTask, record a timestamp as soon as it starts running. Then check the known cache value at the very end of the run. If the cache timestamp is later than the task's starting timestamp, then it knows that it needs to run again because an update happened while it was running. Then you can just start another viewRecalcWaitTask with a 120 second countdown without updating the known cache value.

I think that will result in the behavior you're looking for. Thoughts?

-Wes

from jobtastic.

winhamwr avatar winhamwr commented on July 17, 2024

And if you could think of a good way to include this kind of pattern in Jobtastic with an understandable API, that would be super-cool. Maybe a generic groupedWaitTask and a helper method to get/set the cache with the timestamp?

from jobtastic.

rhunwicks avatar rhunwicks commented on July 17, 2024

This is my first cut...

  • apply_async stores the time the Task should wait until in the cache and then calls super.
  • run checks if the wait time has been reached:
    • if it hasn't then it queues a new Task to run at the wait time and returns None so the Task is treated as SUCCESSFUL by Celery
    • if the wait time has been reached then it runs the task as normal
    • after running the task it sees if the task has been resubmitted while the job was running and if so it submits a new Task for the end of the current wait time.

Does this look sensible to you?

class GroupedWaitTask(JobtasticTask):
    """
    An extension of ``JobtasticTask`` that waits a specified number of seconds
    since the last time it was submitted before it runs.

    This supports tasks that must be run when a set of updates has been complete
    but you don't know when that will be.

    One example is refreshing a summary table (perhaps implemented as a
    materialized view): the refresh must happen in a timely manner after the
    user updates the underlying records, but will be out of date as soon as
    another update is made. Therefore, we want to wait for a short time after
    each update is made to see if there are any more updates, and then run the
    refresh task after the "wait period" expires.

    The following class members are required:

    * ``wait_seconds`` The number of seconds since the last time this task was
      submitted that must elapse before the task can run.
    """
    abstract = True
    ignore_result = True

    @classmethod
    def apply_async(self, args, kwargs, **options):
        """
        Store the timestamp of the most recent submission for this task
        """
        self._validate_required_class_vars()
        cache_key = self._get_cache_key(**kwargs)

        # Store expiry time for the wait period
        if 'eta' in options:
            wait = options['eta']
        elif 'countdown' in options:
            wait = datetime.now(pytz.utc) + timedelta(seconds=options['countdown'])
        else:
            wait = datetime.now(pytz.utc) + timedelta(seconds=self.wait_seconds)
        logging.info("Setting %s to wait until %s", self.name, wait)
        cache.set('wait:%s' % cache_key, wait)

        return super(GroupedWaitTask, self).apply_async(args, kwargs, **options)

    def run(self, *args, **kwargs):
        cache_key = self._get_cache_key(**kwargs)

        if get_task_logger:
            self.logger = get_task_logger(self.__class__.__name__)
        else:
            # Celery 2.X fallback
            self.logger = self.get_logger(**kwargs)

        # If we haven't reached the end of the waiting period then schedule a
        # new task for then and exit
        wait = cache.get('wait:%s' % cache_key)
        if wait >= datetime.now(pytz.utc) and not self.request.is_eager:
            self.logger.info("Deferring %s until %s", self.__class__.__name__, wait)
            # Remove the existing herd protection because we want to submit a new task
            cache.delete('herd:%s' % cache_key)
            self.apply_async(args, kwargs, eta=wait)
            return None

        # We have reached the end of the wait period, so calculate the result
        result = super(GroupedWaitTask, self).run(*args, **kwargs)

        # If a task was submitted after we started calculating the result, then
        # submit a new job now
        new_wait = cache.get('wait:%s' % cache_key)
        if new_wait > wait and not self.request.is_eager:
            self.logger.info("Resubmitting %s for %s", self.__class__.__name__, new_wait)
            self.apply_async(args, kwargs, eta=new_wait)

        return result

    @classmethod
    def _validate_required_class_vars(self):
        """
        Ensure that this subclass has defined all of the required class
        variables.
        """
        required_members = (
            'wait_seconds',
        )
        for required_member in required_members:
            if not hasattr(self, required_member):
                raise Exception(
                    "GroupedWaitTask's must define a %s" % required_member)
        super(GroupedWaitTask, self)._validate_required_class_vars()

from jobtastic.

winhamwr avatar winhamwr commented on July 17, 2024

This looks great! It's probably worth some tests with mocking to ensure we're hitting the right paths, but from reading through, I think this will do what we want. It's also better than my proposed API. This being effectively a wrapper around the task that actually does the work is quite slick.

The only think is missing here (besides being a pull request) is documentation for the README plus (ideally) at least some kind of tests for regression purposes. Supporting multiple versions of Celery without those is pretty tough.

This is very cool!

Thanks
-Wes

from jobtastic.

rhunwicks avatar rhunwicks commented on July 17, 2024

I'm happy to turn it into a PR and do docs. I'd like to do tests too, but I'm not sure where to start. I tried writing tests using my normal (django-based) approach, but it doesn't work in the absence of a working Celery server, and my attempts to get one to run under the control of the Django test runner have been unsuccessful so far. If you could provide an example, I'll add other test cases to it.

from jobtastic.

winhamwr avatar winhamwr commented on July 17, 2024

I can completely relate to coming from a Django testing background.

this test might shed some light. In general, I find mock to be very necessary when writing Celery tests in general, but especially when testing something like Jobtastic.

-Wes

from jobtastic.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.