Giter Site home page Giter Site logo

dvc-task's Introduction

dvc-task

PyPI Status Python Version License

Tests Codecov pre-commit Black

dvc-task is a library for queuing, running and managing background jobs (processes) from standalone Python applications. dvc-task is built on Celery, but does not require a full AMQP messaging server (or any other "heavy" servers which are traditionally used as Celery brokers).

Features

  • dvc_task.proc module for running and managing background processes in Celery tasks
  • Preconfigured Celery app intended for use in standalone desktop applications
    • Uses Kombu filesystem transport as the message broker, and the standard filesystem Celery results backend
    • Allows standalone applications to make use of Celery without the use of additional messaging and results backend servers
  • Preconfigured "temporary" Celery worker which will automatically terminate itself when the Celery queue is empty
    • Allows standalone applications to start Celery workers as needed directly from Python code (as opposed to requiring a "run-forever" daemonized CLI celery worker)

Requirements

  • Celery 5.3 or later
  • Kombu 5.3 or later

Note: Windows is not officially supported in Celery, but dvc-task is tested on Windows (and used in DVC on Windows).

Installation

You can install dvc-task via pip from PyPI:

$ pip install dvc-task

Usage

Processes (dvc_task.proc)

The process module provides a simple API for managing background processes in background tasks. Background processes are run in Celery tasks, but process state is stored separately from Celery, so information about managed processes can be accessed from outside of the Celery producer or consumer application.

After you have configured a Celery application, jobs can be queued (and run) via ProcessManager.run (which returns a signature for the proc.tasks.run Celery task):

from dvc_task.proc import ProcessManager

@app.task
def my_task():
    manager = ProcessManager(wdir=".")
    manager.run(["echo", "hello world"], name="foo").delay()

The ProcessManager will create a subdirectory in wdir for each managed process.

$ tree .
.
└── 25mYD6MyLNewXXdMVYCCr3
    ├── 25mYD6MyLNewXXdMVYCCr3.json
    ├── 25mYD6MyLNewXXdMVYCCr3.out
    └── 25mYD6MyLNewXXdMVYCCr3.pid
1 directory, 3 files

At a minimum, the directory will contain <id>.pid and <id>.json files.

  • <id>.json: A JSON file describing the process containing the following dictionary keys:
    • pid: Process PID
    • stdout: Redirected stdout file path for the process (redirected to <id>.out by default)
    • stderr: Redirected stderr file path for the process (stderr is redirected to stdout by default)
    • stdin: Redirected stdin file path for the process (interactive processes are not yet supported, stdin is currently always null)
    • returncode: Return code for the process (null if the process has not exited)
  • <id>.pid: A standard pidfile containing only the process PID

ProcessManager instances can be created outside of a Celery task to manage and monitor processes as needed:

>>> from dvc_task.proc import ProcessManager
>>> manager = ProcessManager(wdir=".")
>>> names = [name for name, _info in manager.processes()]
['25mYD6MyLNewXXdMVYCCr3']
>>> for line in manager.follow(names[0]):
...     print(line)
...
hello world

Celery Workers (dvc_task.worker)

dvc-task includes a pre-configured Celery worker (TemporaryWorker) which can be started from Python code. The TemporaryWorker will consume Celery tasks until the queue is empty. Once the queue is empty, the worker will wait up until a specified timeout for new tasks to be added to the queue. If the queue remains empty after the timeout expires, the worker will exit.

To instantiante a worker with a 60-second timeout, with the Celery worker name my-worker-1:

>>> from dvc_task.worker import TemporaryWorker
>>> worker = TemporaryWorker(my_app, timeout=60)
>>> worker.start("my-worker-1")

Note that worker.start runs the Celery worker within the calling thread.

Celery Applications (dvc_task.app)

dvc-task includes a pre-configured Celery application (FSApp) which uses the Kombu filesystem transport as the Celery broker along with the Celery filesystem results storage backend. FSApp is intended to be used in standalone Python applications where a traditional Celery producer/consumer setup (with the appropriate messaging and storage backends) is unavailable.

>>> from dvc_task.app import FSApp
>>> my_app = FSApp(wdir=".")

FSApp provides iterators for accessing Kombu messages which are either waiting in the queue or have already been processed. This allows the caller to access Celery task information without using the Celery inspect API (which is only functional when a Celery worker is actively running).

>>> for msg in my_app.iter_processed():
...     msg
<Message object at 0x102e7f0d0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '0244c11a-1bcc-47fc-8587-66909a55fdc6', ...}>
<Message object at 0x1027fd4c0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '491415d1-9527-493a-a5d7-88ed355da77c', ...}>
<Message object at 0x102e6f160 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'ea6ab7a4-0398-42ab-9f12-8da1f8e12a8a', ...}>
<Message object at 0x102e6f310 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '77c4a335-2102-4bee-9cb8-ef4d8ef9713f', ...}>

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide.

License

Distributed under the terms of the Apache 2.0 license, dvc-task is free and open source software.

Issues

If you encounter any problems, please file an issue along with a detailed description.

dvc-task's People

Contributors

daavoo avatar dependabot[bot] avatar github-actions[bot] avatar karajan1001 avatar pmrowla avatar pre-commit-ci[bot] avatar skshetry avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dvc-task's Issues

proc: process info files should be locked during writing

In certain situations it's possible that a reader tries to load proc info from file while it's being written to which causes a jsondecodeerror. ProcessManager should provide minimal amount of synchronization for these cases.

Alternatively, at a minimum we should just write to temporary paths (i.e. output_path.json.<random suffix>) and then fs.move to replace the existing file in:

def _dump(self):
self._make_wdir()
with open(self.info_path, "w", encoding="utf-8") as fobj:
json.dump(self.info.asdict(), fobj)
with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
fobj.write(str(self.pid))

no new releases since 0.0.2

In setup.cfg, there's a git dependency specified, which is currently blocking releases:

kombu@git+https://github.com/celery/kombu.git@0282e1419fad98da5ae956ff38c7e87e539889ac

WARNING  Error during upload. Retry with the --verbose option for more details. 
ERROR    HTTPError: 400 Bad Request from https://upload.pypi.org/legacy/
         Invalid value for requires_dist. Error: Can't have direct dependency:  
         'kombu @ git+https://github.com/celery/kombu.git@0[28](https://github.com/iterative/dvc-task/runs/6874803681?check_suite_focus=true#step:7:29)2e1419fad98da5ae956
         ff38c7e87e539889ac'

I know we are waiting for kombu for the release, but I am creating this issue so that we can track, and figure out if possible, a way to unblock the release. :)

celery: Filesystem transport does not support `celery.app.control`

Kombu FS broker does not include fanout support, so the celery control API cannot be used to monitor workers using the FS broker. We can work around this by using our own proc based wrapper to monitor our local workers, but it may be better to just add proper support for this upstream.

There is an existing but abandoned PR for this upstream that should be an OK starting point:

celery/kombu#1280

fsapp: implement app.inspect based `iter_active`

Currently we implement iter_queued and iter_processed for manually accessing tasks as raw messages via the FS kombu broker. (This is needed for when there is no worker process running, since celery.control.inspect requires talking to a worker via broadcast messages)

def iter_processed(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over tasks which have been taken by a worker.
Arguments:
queue: Optional name of queue.
"""

Ideally we should also have an inspect-based wrapper which can return actual active tasks via celery.inspect for use when we have an active worker process

fsapp: possible race condition w/kombu exchange handling

Looks like when running multiple workers processes (and not concurrency within a single worker) it's possible to get simultaneous writes to reply.celery.pidbox.exchange which ends up generating invalid json. This leads to json parsing errors when using inspect().

This is not reliably reproducible right now, will need future investigation

example-get-started git:master  py:example-get-started ❯ dvc queue status -v                                                                                                                          ⏎
Task     Name    Created    Status
664fb31          03:50 PM   Success
ab4a6b7          03:50 PM   Success
7db2881          03:50 PM   Success

2022-07-05 15:55:47,834 ERROR: unexpected error - Extra data: line 1 column 850 (char 849)
------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/pmrowla/git/dvc/dvc/cli/__init__.py", line 185, in main
    ret = cmd.do_run()
  File "/Users/pmrowla/git/dvc/dvc/cli/command.py", line 22, in do_run
    return self.run()
  File "/Users/pmrowla/git/dvc/dvc/commands/queue/status.py", line 40, in run
    worker_status = self.repo.experiments.celery_queue.worker_status()
  File "/Users/pmrowla/git/dvc/dvc/repo/experiments/queue/celery.py", line 349, in worker_status
    status = self.celery.control.inspect().active() or {}
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/celery/app/control.py", line 149, in active
    return self._request('active', safe=safe)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/celery/app/control.py", line 106, in _request
    return self._prepare(self.app.control.broadcast(
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/celery/app/control.py", line 741, in broadcast
    return self.mailbox(conn)._broadcast(
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/pidbox.py", line 337, in _broadcast
    self._publish(command, arguments, destination=destination,
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/pidbox.py", line 299, in _publish
    maybe_declare(self.reply_queue(chan))
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/common.py", line 112, in maybe_declare
    return _maybe_declare(entity, channel)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/common.py", line 152, in _maybe_declare
    entity.declare(channel=channel)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/entity.py", line 608, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/entity.py", line 619, in _create_queue
    self.queue_bind(nowait=nowait, channel=channel)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/entity.py", line 662, in queue_bind
    return self.bind_to(self.exchange, self.routing_key,
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/entity.py", line 671, in bind_to
    return (channel or self.channel).queue_bind(
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/transport/virtual/base.py", line 568, in queue_bind
    self._queue_bind(exchange, *meta)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/transport/filesystem.py", line 193, in _queue_bind
    queues = self.get_table(exchange)
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/transport/filesystem.py", line 187, in get_table
    exchange_table = loads(bytes_to_str(f_obj.read()))
  File "/Users/pmrowla/.virtualenvs/example-get-started/lib/python3.9/site-packages/kombu/utils/json.py", line 97, in loads
    return _loads(s, object_hook=object_hook)
  File "/opt/homebrew/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/json/__init__.py", line 359, in loads
    return cls(**kw).decode(s)
  File "/opt/homebrew/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 850 (char 849)

Stale secret deletion

It looks like the following GitHub Actions secrets aren't being used in any of your workflows, at least on the default branch:

  • CODECOV_TOKEN

Please ping @iterative/security before 2023-09-22 if you'd like to preserve them.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.