Giter Site home page Giter Site logo

clokep / celery-batches Goto Github PK

View Code? Open in Web Editor NEW
86.0 3.0 25.0 159 KB

Celery Batches allows processing of multiple Celery task requests together

Home Page: https://celery-batches.readthedocs.io/

License: Other

Python 100.00%
python celery celery-task celery-tasks batch-task

celery-batches's People

Contributors

acdha avatar ask avatar clokep avatar dependabot[bot] avatar graingert avatar kinoute avatar montasaurus avatar nijel avatar pmickael avatar scalen avatar stegayet avatar tony avatar weetster avatar wil-langford avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

celery-batches's Issues

Support for celery 5.4

Hello, celery 5.4 is released. The setup.py requires <5.4. Would you like any help testing/supporting 5.4?

Proper way to pass more than one positional argument

Hi,

I'm using celery-batches and following the example in the documentation, with modification to fit my own need.
I'm expecting to use the decorated function with get_batch_result.delay(char, font). However, I don't know how to set the function's argument accordingly.
I tried def get_batch_result(requests), following error occurs when the task came in:

TypeError: get_batch_result() takes 1 positional argument but 2 were given

If I try def get_batch_result(requests1, requests2) , task came in and it prints:

TypeError: get_batch_result() missing 1 required positional argument: 'requests2'

I also tried def get_batch_result(*requests). Now requests is a tuple, containing one list of SimpleRequest. The tuple part doesn't seem to follow what the document describes. I'm not sure if I'm doing this right.
Is there a standard way to do this? Or currently I should pack my argument as a list or tuple or kwargs as a work-around.

Batches task doesn't trigger next task in chain

I'm trying to use celery-batches in chain, like this:

chain(
	task_foo.s(*args, **kwargs),
	batch_task.s(*args, **kwargs),
	task_bar.s(*args, **kwargs),
).apply_async()

Celery never registers task_bar as batch_task child and the chain cuts off. What could be the cause and how can I fix it?

flower support

Hello!
How can I use it with flower?
"create_task" works fine with flower, but I can't see "count_click" tasks on flower.
What could I do wrong?
It feels like I'm using wrong namespaces or starting arguments.

from .celery import celery_app
import time

from celery_batches import Batches, SimpleRequest
from collections import Counter

@celery_app.task(name="create_task")
def create_task(task_type):
    time.sleep(int(task_type) * 2)
    return True


@celery_app.task(base=Batches, flush_every=5, flush_interval=2)
def count_click(requests: list[SimpleRequest]):
    """Count the number of times each URL is requested."""
    count = Counter(request.kwargs["click"] for request in requests)
    for url, count in count.items():
        print(f">>> Clicks: {url} -> {count}")
    for request in requests:
        celery_app.backend.mark_as_done(request.id, 'hi', request=request)

A working example would be appreciated.

Loosen celery version constraint?

Celery 5.3 is out as of June 6th, 2023 ๐ŸŽ‰ !

@clokep: Are you open to removing the <5.3 constraint completely and publishing the celery-batches to PyPI with a version bump (e.g. 0.7.1)?

History

f261a90#diff-fa602a8a75dc9dcc92261bac5f533c2a85e34fcceaff63b3a3a81d9acde2fc52R33

#36

Poetry constraint error when celery-batches @ 0.7.0 + celery 5.3.0
โฏ poetry update celery
Updating dependencies
Resolving dependencies... (11.8s)

Because celery-batches (0.7) depends on celery (>=4.4,<5.3)
 and no versions of celery-batches match >0.7.0,<0.8.0, celery-batches (>=0.7.0,<0.8.0) requires celery (>=4.4,<5.3).
So, because eduflow depends on both celery (~5.3.0) and celery-batches (~0.7.0), version solving failed.

docs: Ambiguity / shadowing in doc example

Hi! Thank you for the project @clokep.

I have some confusion re: the docs example, between the conventions:

requests vs celery's celery.worker.request vs celery-batches's own celery_batches.SimpleRequest vs the example's requests param in wot_api(requests)

import requests
from urlparse import urlparse

from celery_batches import Batches

wot_api_target = 'https://api.mywot.com/0.4/public_link_json'

@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
    sig = lambda url: url
    reponses = wot_api_real(
        (sig(*request.args, **request.kwargs) for request in requests)
    )
    # use mark_as_done to manually return response data
    for response, request in zip(reponses, requests):
        app.backend.mark_as_done(request.id, response, request=request)


def wot_api_real(urls):
    domains = [urlparse(url).netloc for url in urls]
    response = requests.get(
        wot_api_target,
        params={'hosts': ('/').join(set(domains)) + '/'}
    )
    return [response.json[domain] for domain in domains]

requests is imported, but it also seems to be used in def wot_api(requests).

Proposal:

To avoid ambiguity, we can make an example that avoids using the requests package completely (despite it being ubiquitous, oddly enough its naming collides in this example ๐Ÿ˜†)

@clokep Are you open to collaborating with me on improving the docs? I'm willing to spend a few hours cleaning up (but not sure if you have your own ideas in mind)

Add "How does it work?" section with a little explanation

Hi! I was looking at plugins for celery in the github and found celery-batches but didn't quite well get how it actually work.
It'd be great to add How does it work? section in README.md, as for https://github.com/steinitzu/celery-singleton#how-does-it-work

For instance, I have an usual HTTP API setup:

  1. HTTP API service - gunicorn that runs django with celery's app inside (not worker, just publisher). gunicorn runs 5 processes of django http app, and django the only code that calls celery's tasks like this: last_seen_user.delay(user=user_id, when=utcnow()).
  2. celery's workers - are run with celery --concurrency=4

How does celery-batches work in this setup?

The documentation says:

Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached.

Does it mean that Task requests are buffered inside one of 5 processes of django app that processed the request?
I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we have two local (in memory) queues for this BatchTask, even for the same user and after flush_interval or flush_every (that we count only for THIS process, not for all of them) - we actually send Task to the celery broker, for our case - we send two tasks even if it was the same user.

Oversized batches of retried tasks

Retrying tasks with countdowns can cause these tasks to pile up in the _pending queue. For argument's sake, let's say there are 10k such tasks (all with the same ETA) in a _pending queue but flush_every is only 10. When the ETA is reached and 10 messages are received from the broker, then the list of SimpleRequest objects passed to the task will be of length 10010. I understand that flush_every=10 is not a promise that the batch size won't be something larger like 10010, but without enforcing some kind of maximum on the batch size, it becomes difficult to reason about how long a batch is expected to take, which is important when setting SQS visibility timeouts for example.

Capped number of messages queued with flush_every

celery-batches is yielding fewer messages than specified in flush_every, despite having significantly more messages in the queue. It always returns 48 messages in a batch instead of the 1000 that's specified on task creation time.

I did some digging with the following script:

create.py

from tasks import add
from tqdm import tqdm
from celery import group

for i in tqdm(range(1000000)):
    add.delay(4)

tasks.py

from celery import Celery
from time import sleep
from celery_batches import Batches

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task(base=Batches, flush_every=1000, flush_interval=10)
def add(requests):
    print(len(requests))

Executing with:

celery -A tasks worker --loglevel=info

This results in continual messages of:

[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48
[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48
[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48

These messages only get flushed because of the timeout specified by flush_interval. The root cause seems to be the task_message_handler (within def Strategy) itself only gets called 48 times, so it will never reach the exit condition of queueing up to 1000 messages. Is there something up with the internal settings of celery that caps the amount of outstanding messages that are passed to the task_message_handler? This number appears to be 4*concurrency, since I'm dealing with 12 workers by default and this number drops down to 4 when used with only one worker.

Reproducible with the latest version of celery 4.4.0 on OS X.

Requests not being batched?

I have the following Flask app:

from flask_pydantic import validate
from pydantic import BaseModel

from cel import count_click


class Request(BaseModel):
    query: str


app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6000/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6000/0'


@app.post('/run_task')
@validate(Request)
def run_background_task(body: Request):
    result = count_click.delay(query=body.query)
    response = result.wait()
    print(f"Response: {response}")
    return response


if __name__ == '__main__':
    app.run(debug=True, port=7000)

and Celery endpoint:

from celery_batches import Batches

cel = Celery('batching', broker='redis://localhost:6000/0', backend='redis://localhost:6000/0')


# Flush after 100 messages, or 10 seconds.
@cel.task(base=Batches, flush_every=10, flush_interval=1)
def count_click(requests):
    """Count the number of times each URL is requested."""
    for i, request in enumerate(requests):
        print(f"Request: {i}.{request.kwargs['query']}")
        current_app.backend.mark_as_done(request.id, request.kwargs['query'], request=request)

and made requests with the following snippet.

host = "http://127.0.0.1:7000/run_task"

responses = []

for i in range(4):
    responses += [requests.post(host, json = {'query' : f'query_{i}'})]

but my Celery task is printing each task individually:

[2022-12-12 19:56:57,995: WARNING/ForkPoolWorker-6] Request: 0.query_0
[2022-12-12 19:56:58,507: WARNING/ForkPoolWorker-6] Request: 0.query_1
[2022-12-12 19:56:58,914: WARNING/ForkPoolWorker-6] Request: 0.query_2
[2022-12-12 19:56:59,515: WARNING/ForkPoolWorker-6] Request: 0.query_3

Why aren't my requests not being batched?

Deserialising rabbitMQ messages fails when I switch to batch execution using celery-batches

Problem description

I had an update task that I created in celery that aggregates long-running document update requests together in Postgres and executes them in batches. Since this was not optimal (huge amount of work to update aggregated lists in the database), I was trying to use celery-batches to retrieve and process the single tasks in batches and thus have fewer database updates.
As soon as I switch to celery-batches, I have decoder errors when trying to continue work on the old queue (I figured, that this should be possible since celery-batches seemed to only aggregate the previous requests - is that right?)

However, when I check the logfiles of my celery worker, I see errors when deserialising the rabbitMQ messages. Those messages deserialized just fine when I was in single-processing mode.

Anything that I am doing wrong or is this an issue in celery-batches? I am happy to debug myself and create a patch, but I would like to first confirm that what I am doing is correct and that I am using celery-batches as intended.

Example data

celery message (taken from RabbitMQ Management
image

Task

@shared_task(
    base=Batches,
    flush_every=1000,
    flush_interval=10,
    bind=False,
    queue='did_update_aggregation',
    routing_key='omg.did_update_aggregation',
    max_retries=5)
def aggregate_did_task(requests):
    aggregate_by_scenario_id = {}
    for request in requests:
        scenario_id = request.args[0]
        did = request.args[1]
        if scenario_id not in aggregate_by_scenario_id:
            aggregate_by_scenario_id[scenario_id] = [did]
        else:
            aggregate_by_scenario_id[scenario_id].append(did)
    for scenario_id, did_list in aggregate_by_scenario_id.iteritems():
        aggregate_dids(scenario_id=scenario_id, did_list=did_list)

previous implementation

@shared_task(
    base=Task,
    bind=False,
    queue='did_update_aggregation',
    routing_key='omg.did_update_aggregation',
    max_retries=5)
def aggregate_did_task(scenario_id, did, **_kwargs):
    aggregate_did(scenario_id=scenario_id, did=did)

Async call
aggregate_did_task.apply_async(args=(scenario_id, did,))

Stacktrace in log

  File "/usr/local/lib/python2.7/site-packages/vine/promises.py", line 140, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 218, in _read
    drain_events(timeout=0)
  File "/usr/local/lib/python2.7/site-packages/librabbitmq/__init__.py", line 227, in drain_events
    self._basic_recv(timeout)
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
    callbacks,
  File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 254, in task_message_handler
    flush_buffer()
  File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 269, in _do_flush
    self.flush(requests)
  File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 260, in flush
    for r in requests],))
  File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 195, in from_request
    args, kwargs, embed = request._payload
  File "/usr/local/lib/python2.7/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/request.py", line 478, in _payload
    return self.body if self._decoded else self.message.payload
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 207, in payload
    return self._decoded_cache if self._decoded_cache else self.decode()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 192, in decode
    self._decoded_cache = self._decode()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 197, in _decode
    self.content_encoding, accept=self.accept)
  File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/site-packages/bson/json_util.py", line 438, in loads
    return json.loads(s, *args, **kwargs)
  File "/usr/local/lib/python2.7/json/__init__.py", line 352, in loads
    return cls(encoding=encoding, **kw).decode(s)
  File "/usr/local/lib/python2.7/json/decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
DecodeError: expected string or buffer

How to get the batches return value?

Hi,

I am using the example code to try to get the return value by using get() function. However, it raised an error as blow:

Traceback (most recent call last):
  File "/mnt/d/pycharm/smt_simulator/test.py", line 9, in <module>
    process.get()
  File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 228, in get
    on_message=on_message,
  File "/usr/local/lib/python3.6/dist-packages/celery/backends/base.py", line 571, in wait_for_pending
    no_ack=no_ack,
  File "/usr/local/lib/python3.6/dist-packages/celery/backends/base.py", line 873, in _is_disabled
    raise NotImplementedError(E_NO_BACKEND.strip())
NotImplementedError: No result backend is configured.
Please see the documentation for more information.

The worker code is:

import requests
from urllib.parse import urlparse
from celery_batches import Batches
from celery import Celery

app = Celery('google_streamer',broker='redis://localhost:6379')
app.conf.worker_prefetch_multiplier = 0
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'

@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
    sig = lambda url: url
    reponses = wot_api_real(
        (sig(*request.args, **request.kwargs) for request in requests)
    )
    # use mark_as_done to manually return response data
    for response, request in zip(reponses, requests):
        app.backend.mark_as_done(request.id, response)

def wot_api_real(urls):
    domains = [urlparse(url).netloc for url in urls]
    response = requests.get(
        wot_api_target,
        params={'hosts': ('/').join(set(domains)) + '/'}
    )
    return [response.json()[domain] for domain in domains]

The celery running command is:
celecy -A tasks worker -l info -B
The calling code is:

from tasks import  wot_api

process = wot_api.delay('http://example.com')
process.get()

Support task-failed event for batch tasks

I open this issue to discuss / work on the task-failed events. For information, using events enables us (developpers) to monitor our celery batch apps using tools such as prometheus (in combination with a CeleryExporter for exemple).

Therefore having most (if not all) events used by celery would be a huge benefit.

Multiple tasks issue

I try to run in multiple tasks but every time running for a while the batch stop functioning only process one task per time.

Example:

@celery.task(name="create_task_a", base=Batches, flush_every=20, flush_interval=1.5)
def create_task_a(): 

@celery.task(name="create_task_b", base=Batches, flush_every=20, flush_interval=1.5)
def create_task_b(): 

If i only run create_task_a, it is good. Celery can process task in a batch(20).
But if i run create_task_a and create_task_b at same time, it always be good at the beginning and stop batch process after a few batches

iteration: 1
[create_task_a, create_task_a...... 20 in total]
iteration: 2
[create_task_a, create_task_a......20 in total]
iteration: 3
[create_task_b create_task_b......20 in total]
iteration: 4
[create_task_a create_task_a......20 in total]
..... after 5-6 iteration
iteration:
[create_task_a create_task_a......8 in total]
[create_task_b create_task_b......6 in total]
[create_task_b create_task_b......3 in total]
..... after 15-20 iteration
[create_task_a]
[create_task_a]
[create_task_b]
[create_task_b]
[create_task_a]
[create_task_b]

how can i fix the issue? thanks in advance

Remove upper bounds on version constraints

@clokep Hi!

Can you please remove upper bounds on version constraints for celery and publish a release?

This is affecting my verifying compatibility of celery-batches with celery 5.4.0+ (see #87). In this case, celery is a transitive dependency blocking a larger update. I expect others are similarly bottlenecked.

Rationale:

  • Removing the upper bound does not imply compatibility
  • Those who run into any issues can report a bug or a open pull request, rather than the package not installing.
  • Package managers, e.g. poetry, are enforcing these upper bounds strictly, inhibiting ability to test basic functionality
    • There may not necessarily be a breakage. In which case downstream users are blocked unnecessarily.

Run tests with different brokers / backends

It would be nice to run the tests with brokers / backends that aren't just in-memory. We could probably read the broker / backend to use from environment variables and use Docker contains in GitHub Actions to do this in CI.

Celery batches and CELERY_TASK_ALWAYS_EAGER

I'd like to use CELERY_TASK_ALWAYS_EAGER for simple worker less setup and testing (I know it's not best way, but certainly simplest to begin with), but that doesn't seem to play well with celery-batches.

Is there some way to make eager mode work with batches or it's simply no go and I always need a worker to make it work?

Unable to install with celery 5.3.1

Thanks for merging #75 but I'm not able to install with celery 5.3.1, presumably because of this line

install_requires = celery>=5.0,<5.3

Does the 5.3 need changing to 5.4?

The error when trying to run pip-compile is:

There are incompatible versions in the resolved dependencies:
  celery==5.3.1 from https://files.pythonhosted.org/packages/18/b9/cb8d519ea0094b9b8fe7480225c14937517729f8ec927643dc7379904f64/celery-5.3.1-py3-none-any.whl (from -r requirements.in (line 11))
  celery<5.3,>=5.0 (from celery-batches==0.8->-r requirements.in (line 12))

Doesn't seem to support rpc://

Hi,

Thanks for nice library! I have a problem with rpc as results backend, while file-system backend works fine. I couldn't make rpc work even with some basic return values. Here's my minimal example:

server.py

import logging

from celery import Celery
from celery_batches import Batches


app = Celery('example',
             backend='rpc://',
            #  backend='file://.celery_store',
             broker='amqp://guest@localhost//',
)

@app.task(base=Batches, flush_every=3, flush_interval=0.1)
def process(inputs=None):
    logging.info('Received {} tasks'.format(len(inputs)))
    for inp in inputs:
        app.backend.mark_as_done(inp.id, 'ok')

client.py

from server import process


if __name__ == '__main__':
    tasks = [process.delay() for _ in range(3)]

    for task in tasks:
        output = task.get(timeout=10)

The client script fails with celery.exceptions.TimeoutError: The operation timed out..

Library versions:

celery==5.0.5
celery-batches==0.4

More examples

Carried forward from #42 (comment) by @clokep

  • Keep in examples/
  • Back via pytest to automatically detect breakages via CI
  • Copy-paste friendly
  • #53: Potentially show interoperation with other base classes

outdated documentation?

Hi,

The first example in the documentation does not work. Can you provide a simple example with named arguments?

TypeError: count_click() got an unexpected keyword argument 'url'

Support success event for batched tasks

HI @clokep , thanks for the package. I'm interested to know a bit more about the class SimpleRequest . As mentioned in the doc string for class SimpleRequest (SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request), but in the code class SimpleRequest has only the subset of properties , compared to the class:~celery.worker.request.Request.
So if I want to use some of the methods (ex. send_event ) or properties of the class:~celery.worker.request.Request, what will be the suggested/best way that I can do with the class SimpleRequest

My use case : I want to send the send_event('task-succeeded') after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.