clokep / celery-batches Goto Github PK
View Code? Open in Web Editor NEWCelery Batches allows processing of multiple Celery task requests together
Home Page: https://celery-batches.readthedocs.io/
License: Other
Celery Batches allows processing of multiple Celery task requests together
Home Page: https://celery-batches.readthedocs.io/
License: Other
Hello, celery 5.4 is released. The setup.py requires <5.4. Would you like any help testing/supporting 5.4?
The only place I've found linking to docs was the badge. Personally I've linked that in my own projects to RTD/github action build status.
Link to this in pypi, github, and README (beyond just the badge): https://celery-batches.readthedocs.io/en/latest/
https://github.com/steinitzu/celery-singleton's celery_batch.Singleton
's BaseTask
prevents a task with the same args from refiring.
What would a conglomeration with celery_batches.Batches
potentially look like?
Hi,
I'm using celery-batches and following the example in the documentation, with modification to fit my own need.
I'm expecting to use the decorated function with get_batch_result.delay(char, font)
. However, I don't know how to set the function's argument accordingly.
I tried def get_batch_result(requests)
, following error occurs when the task came in:
TypeError: get_batch_result() takes 1 positional argument but 2 were given
If I try def get_batch_result(requests1, requests2)
, task came in and it prints:
TypeError: get_batch_result() missing 1 required positional argument: 'requests2'
I also tried def get_batch_result(*requests)
. Now requests is a tuple, containing one list of SimpleRequest. The tuple part doesn't seem to follow what the document describes. I'm not sure if I'm doing this right.
Is there a standard way to do this? Or currently I should pack my argument as a list or tuple or kwargs as a work-around.
I'm trying to use celery-batches in chain, like this:
chain(
task_foo.s(*args, **kwargs),
batch_task.s(*args, **kwargs),
task_bar.s(*args, **kwargs),
).apply_async()
Celery never registers task_bar
as batch_task
child and the chain cuts off. What could be the cause and how can I fix it?
Hello!
How can I use it with flower?
"create_task" works fine with flower, but I can't see "count_click" tasks on flower.
What could I do wrong?
It feels like I'm using wrong namespaces or starting arguments.
from .celery import celery_app
import time
from celery_batches import Batches, SimpleRequest
from collections import Counter
@celery_app.task(name="create_task")
def create_task(task_type):
time.sleep(int(task_type) * 2)
return True
@celery_app.task(base=Batches, flush_every=5, flush_interval=2)
def count_click(requests: list[SimpleRequest]):
"""Count the number of times each URL is requested."""
count = Counter(request.kwargs["click"] for request in requests)
for url, count in count.items():
print(f">>> Clicks: {url} -> {count}")
for request in requests:
celery_app.backend.mark_as_done(request.id, 'hi', request=request)
A working example would be appreciated.
Celery 5.3 is out as of June 6th, 2023 ๐ !
@clokep: Are you open to removing the <5.3
constraint completely and publishing the celery-batches
to PyPI with a version bump (e.g. 0.7.1
)?
โฏ poetry update celery
Updating dependencies
Resolving dependencies... (11.8s)
Because celery-batches (0.7) depends on celery (>=4.4,<5.3)
and no versions of celery-batches match >0.7.0,<0.8.0, celery-batches (>=0.7.0,<0.8.0) requires celery (>=4.4,<5.3).
So, because eduflow depends on both celery (~5.3.0) and celery-batches (~0.7.0), version solving failed.
Hi! Thank you for the project @clokep.
I have some confusion re: the docs example, between the conventions:
requests
vs celery's celery.worker.request
vs celery-batches's own celery_batches.SimpleRequest
vs the example's requests
param in wot_api(requests)
import requests
from urlparse import urlparse
from celery_batches import Batches
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
sig = lambda url: url
reponses = wot_api_real(
(sig(*request.args, **request.kwargs) for request in requests)
)
# use mark_as_done to manually return response data
for response, request in zip(reponses, requests):
app.backend.mark_as_done(request.id, response, request=request)
def wot_api_real(urls):
domains = [urlparse(url).netloc for url in urls]
response = requests.get(
wot_api_target,
params={'hosts': ('/').join(set(domains)) + '/'}
)
return [response.json[domain] for domain in domains]
requests
is imported, but it also seems to be used in def wot_api(requests)
.
Proposal:
To avoid ambiguity, we can make an example that avoids using the requests
package completely (despite it being ubiquitous, oddly enough its naming collides in this example ๐)
@clokep Are you open to collaborating with me on improving the docs? I'm willing to spend a few hours cleaning up (but not sure if you have your own ideas in mind)
Hi! I was looking at plugins for celery in the github and found celery-batches but didn't quite well get how it actually work.
It'd be great to add How does it work? section in README.md, as for https://github.com/steinitzu/celery-singleton#how-does-it-work
For instance, I have an usual HTTP API setup:
gunicorn
that runs django
with celery
's app inside (not worker, just publisher). gunicorn
runs 5 processes of django
http app, and django
the only code that calls celery
's tasks like this: last_seen_user.delay(user=user_id, when=utcnow())
.celery
's workers - are run with celery --concurrency=4
How does celery-batches
work in this setup?
The documentation says:
Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached.
Does it mean that Task requests are buffered inside one of 5 processes of django
app that processed the request?
I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we have two local (in memory) queues for this BatchTask
, even for the same user and after flush_interval
or flush_every
(that we count only for THIS process, not for all of them) - we actually send Task to the celery broker, for our case - we send two tasks even if it was the same user.
Retrying tasks with countdowns can cause these tasks to pile up in the _pending queue. For argument's sake, let's say there are 10k such tasks (all with the same ETA) in a _pending queue but flush_every is only 10. When the ETA is reached and 10 messages are received from the broker, then the list of SimpleRequest objects passed to the task will be of length 10010. I understand that flush_every=10 is not a promise that the batch size won't be something larger like 10010, but without enforcing some kind of maximum on the batch size, it becomes difficult to reason about how long a batch is expected to take, which is important when setting SQS visibility timeouts for example.
I use this on a few projects:
https://github.com/tmux-python/tmuxp/blob/master/pyproject.toml, https://github.com/tmux-python/libtmux/blob/master/pyproject.toml, https://github.com/cihai/unihan-etl/blob/master/pyproject.toml
poetry shell
poetry install
poetry publish
to make wheel / sdist and push to pypicelery-batches
is yielding fewer messages than specified in flush_every
, despite having significantly more messages in the queue. It always returns 48 messages in a batch instead of the 1000 that's specified on task creation time.
I did some digging with the following script:
create.py
from tasks import add
from tqdm import tqdm
from celery import group
for i in tqdm(range(1000000)):
add.delay(4)
tasks.py
from celery import Celery
from time import sleep
from celery_batches import Batches
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task(base=Batches, flush_every=1000, flush_interval=10)
def add(requests):
print(len(requests))
Executing with:
celery -A tasks worker --loglevel=info
This results in continual messages of:
[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48
[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48
[2019-12-16 07:47:29,675: WARNING/ForkPoolWorker-8] 48
These messages only get flushed because of the timeout specified by flush_interval
. The root cause seems to be the task_message_handler
(within def Strategy
) itself only gets called 48 times, so it will never reach the exit condition of queueing up to 1000 messages. Is there something up with the internal settings of celery that caps the amount of outstanding messages that are passed to the task_message_handler? This number appears to be 4*concurrency
, since I'm dealing with 12 workers by default and this number drops down to 4 when used with only one worker.
Reproducible with the latest version of celery 4.4.0
on OS X.
I have the following Flask app:
from flask_pydantic import validate
from pydantic import BaseModel
from cel import count_click
class Request(BaseModel):
query: str
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6000/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6000/0'
@app.post('/run_task')
@validate(Request)
def run_background_task(body: Request):
result = count_click.delay(query=body.query)
response = result.wait()
print(f"Response: {response}")
return response
if __name__ == '__main__':
app.run(debug=True, port=7000)
and Celery endpoint:
from celery_batches import Batches
cel = Celery('batching', broker='redis://localhost:6000/0', backend='redis://localhost:6000/0')
# Flush after 100 messages, or 10 seconds.
@cel.task(base=Batches, flush_every=10, flush_interval=1)
def count_click(requests):
"""Count the number of times each URL is requested."""
for i, request in enumerate(requests):
print(f"Request: {i}.{request.kwargs['query']}")
current_app.backend.mark_as_done(request.id, request.kwargs['query'], request=request)
and made requests with the following snippet.
host = "http://127.0.0.1:7000/run_task"
responses = []
for i in range(4):
responses += [requests.post(host, json = {'query' : f'query_{i}'})]
but my Celery task is printing each task individually:
[2022-12-12 19:56:57,995: WARNING/ForkPoolWorker-6] Request: 0.query_0
[2022-12-12 19:56:58,507: WARNING/ForkPoolWorker-6] Request: 0.query_1
[2022-12-12 19:56:58,914: WARNING/ForkPoolWorker-6] Request: 0.query_2
[2022-12-12 19:56:59,515: WARNING/ForkPoolWorker-6] Request: 0.query_3
Why aren't my requests not being batched?
I had an update task that I created in celery that aggregates long-running document update requests together in Postgres and executes them in batches. Since this was not optimal (huge amount of work to update aggregated lists in the database), I was trying to use celery-batches to retrieve and process the single tasks in batches and thus have fewer database updates.
As soon as I switch to celery-batches, I have decoder errors when trying to continue work on the old queue (I figured, that this should be possible since celery-batches seemed to only aggregate the previous requests - is that right?)
However, when I check the logfiles of my celery worker, I see errors when deserialising the rabbitMQ messages. Those messages deserialized just fine when I was in single-processing mode.
Anything that I am doing wrong or is this an issue in celery-batches? I am happy to debug myself and create a patch, but I would like to first confirm that what I am doing is correct and that I am using celery-batches as intended.
celery message (taken from RabbitMQ Management
Task
@shared_task(
base=Batches,
flush_every=1000,
flush_interval=10,
bind=False,
queue='did_update_aggregation',
routing_key='omg.did_update_aggregation',
max_retries=5)
def aggregate_did_task(requests):
aggregate_by_scenario_id = {}
for request in requests:
scenario_id = request.args[0]
did = request.args[1]
if scenario_id not in aggregate_by_scenario_id:
aggregate_by_scenario_id[scenario_id] = [did]
else:
aggregate_by_scenario_id[scenario_id].append(did)
for scenario_id, did_list in aggregate_by_scenario_id.iteritems():
aggregate_dids(scenario_id=scenario_id, did_list=did_list)
previous implementation
@shared_task(
base=Task,
bind=False,
queue='did_update_aggregation',
routing_key='omg.did_update_aggregation',
max_retries=5)
def aggregate_did_task(scenario_id, did, **_kwargs):
aggregate_did(scenario_id=scenario_id, did=did)
Async call
aggregate_did_task.apply_async(args=(scenario_id, did,))
Stacktrace in log
File "/usr/local/lib/python2.7/site-packages/vine/promises.py", line 140, in __call__
retval = fun(*final_args, **final_kwargs)
File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 218, in _read
drain_events(timeout=0)
File "/usr/local/lib/python2.7/site-packages/librabbitmq/__init__.py", line 227, in drain_events
self._basic_recv(timeout)
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
callbacks,
File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 254, in task_message_handler
flush_buffer()
File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 269, in _do_flush
self.flush(requests)
File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 260, in flush
for r in requests],))
File "/usr/local/lib/python2.7/site-packages/celery_batches/__init__.py", line 195, in from_request
args, kwargs, embed = request._payload
File "/usr/local/lib/python2.7/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python2.7/site-packages/celery/worker/request.py", line 478, in _payload
return self.body if self._decoded else self.message.payload
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 207, in payload
return self._decoded_cache if self._decoded_cache else self.decode()
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 192, in decode
self._decoded_cache = self._decode()
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 197, in _decode
self.content_encoding, accept=self.accept)
File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/local/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/local/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/local/lib/python2.7/site-packages/bson/json_util.py", line 438, in loads
return json.loads(s, *args, **kwargs)
File "/usr/local/lib/python2.7/json/__init__.py", line 352, in loads
return cls(encoding=encoding, **kw).decode(s)
File "/usr/local/lib/python2.7/json/decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
DecodeError: expected string or buffer
Hi,
I am using the example code to try to get the return value by using get() function. However, it raised an error as blow:
Traceback (most recent call last):
File "/mnt/d/pycharm/smt_simulator/test.py", line 9, in <module>
process.get()
File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 228, in get
on_message=on_message,
File "/usr/local/lib/python3.6/dist-packages/celery/backends/base.py", line 571, in wait_for_pending
no_ack=no_ack,
File "/usr/local/lib/python3.6/dist-packages/celery/backends/base.py", line 873, in _is_disabled
raise NotImplementedError(E_NO_BACKEND.strip())
NotImplementedError: No result backend is configured.
Please see the documentation for more information.
The worker code is:
import requests
from urllib.parse import urlparse
from celery_batches import Batches
from celery import Celery
app = Celery('google_streamer',broker='redis://localhost:6379')
app.conf.worker_prefetch_multiplier = 0
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
sig = lambda url: url
reponses = wot_api_real(
(sig(*request.args, **request.kwargs) for request in requests)
)
# use mark_as_done to manually return response data
for response, request in zip(reponses, requests):
app.backend.mark_as_done(request.id, response)
def wot_api_real(urls):
domains = [urlparse(url).netloc for url in urls]
response = requests.get(
wot_api_target,
params={'hosts': ('/').join(set(domains)) + '/'}
)
return [response.json()[domain] for domain in domains]
The celery running command is:
celecy -A tasks worker -l info -B
The calling code is:
from tasks import wot_api
process = wot_api.delay('http://example.com')
process.get()
I open this issue to discuss / work on the task-failed
events. For information, using events enables us (developpers) to monitor our celery batch apps using tools such as prometheus (in combination with a CeleryExporter
for exemple).
Therefore having most (if not all) events used by celery would be a huge benefit.
I try to run in multiple tasks but every time running for a while the batch stop functioning only process one task per time.
Example:
@celery.task(name="create_task_a", base=Batches, flush_every=20, flush_interval=1.5)
def create_task_a():
@celery.task(name="create_task_b", base=Batches, flush_every=20, flush_interval=1.5)
def create_task_b():
If i only run create_task_a, it is good. Celery can process task in a batch(20).
But if i run create_task_a
and create_task_b
at same time, it always be good at the beginning and stop batch process after a few batches
iteration: 1
[create_task_a, create_task_a...... 20 in total]
iteration: 2
[create_task_a, create_task_a......20 in total]
iteration: 3
[create_task_b create_task_b......20 in total]
iteration: 4
[create_task_a create_task_a......20 in total]
..... after 5-6 iteration
iteration:
[create_task_a create_task_a......8 in total]
[create_task_b create_task_b......6 in total]
[create_task_b create_task_b......3 in total]
..... after 15-20 iteration
[create_task_a]
[create_task_a]
[create_task_b]
[create_task_b]
[create_task_a]
[create_task_b]
how can i fix the issue? thanks in advance
The celery batches doesn't do retries, in more detail it's described in celery/celery#1498 with possible workaround.
@clokep Hi!
Can you please remove upper bounds on version constraints for celery
and publish a release?
This is affecting my verifying compatibility of celery-batches
with celery
5.4.0+ (see #87). In this case, celery is a transitive dependency blocking a larger update. I expect others are similarly bottlenecked.
Rationale:
poetry
, are enforcing these upper bounds strictly, inhibiting ability to test basic functionality
See celery v5.3.0a1 (and kombu v5.3.0a1).
It would be nice to run the tests with brokers / backends that aren't just in-memory. We could probably read the broker / backend to use from environment variables and use Docker contains in GitHub Actions to do this in CI.
I'd like to use CELERY_TASK_ALWAYS_EAGER
for simple worker less setup and testing (I know it's not best way, but certainly simplest to begin with), but that doesn't seem to play well with celery-batches.
Is there some way to make eager mode work with batches or it's simply no go and I always need a worker to make it work?
Thanks for merging #75 but I'm not able to install with celery 5.3.1, presumably because of this line
Line 38 in e7e3f2c
Does the 5.3 need changing to 5.4?
The error when trying to run pip-compile
is:
There are incompatible versions in the resolved dependencies:
celery==5.3.1 from https://files.pythonhosted.org/packages/18/b9/cb8d519ea0094b9b8fe7480225c14937517729f8ec927643dc7379904f64/celery-5.3.1-py3-none-any.whl (from -r requirements.in (line 11))
celery<5.3,>=5.0 (from celery-batches==0.8->-r requirements.in (line 12))
Hi,
Thanks for nice library! I have a problem with rpc as results backend, while file-system backend works fine. I couldn't make rpc work even with some basic return values. Here's my minimal example:
server.py
import logging
from celery import Celery
from celery_batches import Batches
app = Celery('example',
backend='rpc://',
# backend='file://.celery_store',
broker='amqp://guest@localhost//',
)
@app.task(base=Batches, flush_every=3, flush_interval=0.1)
def process(inputs=None):
logging.info('Received {} tasks'.format(len(inputs)))
for inp in inputs:
app.backend.mark_as_done(inp.id, 'ok')
client.py
from server import process
if __name__ == '__main__':
tasks = [process.delay() for _ in range(3)]
for task in tasks:
output = task.get(timeout=10)
The client script fails with celery.exceptions.TimeoutError: The operation timed out.
.
Library versions:
celery==5.0.5
celery-batches==0.4
Carried forward from #42 (comment) by @clokep
examples/
Hi,
The first example in the documentation does not work. Can you provide a simple example with named arguments?
TypeError: count_click() got an unexpected keyword argument 'url'
HI @clokep , thanks for the package. I'm interested to know a bit more about the class SimpleRequest
. As mentioned in the doc string for class SimpleRequest
(SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request
), but in the code class SimpleRequest
has only the subset of properties , compared to the class:~celery.worker.request.Request
.
So if I want to use some of the methods (ex. send_event ) or properties of the class:~celery.worker.request.Request
, what will be the suggested/best way that I can do with the class SimpleRequest
My use case : I want to send the send_event('task-succeeded')
after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.
Is there any recommendation on how to properly wire up celery-batches with Flask when an application is using the integration recipe as described under this URL? Our application uses this recipe by subtasking the Celery Task class but then, so when trying to integration with celery-batches and specifying Base as an argument to define the task implementation to use, it bypasses the wiring.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.