Giter Site home page Giter Site logo

tasktiger's Introduction

TaskTiger

https://github.com/closeio/tasktiger/actions/workflows/test.yaml/badge.svg?event=push

TaskTiger is a Python task queue using Redis.

(Interested in working on projects like this? Close is looking for great engineers to join our team)

  • Per-task fork or synchronous worker

    By default, TaskTiger forks a subprocess for each task, This comes with several benefits: Memory leaks caused by tasks are avoided since the subprocess is terminated when the task is finished. A hard time limit can be set for each task, after which the task is killed if it hasn't completed. To ensure performance, any necessary Python modules can be preloaded in the parent process.

    TaskTiger also supports synchronous workers, which allows for better performance due to no forking overhead, and tasks have the ability to reuse network connections. To prevent memory leaks from accumulating, workers can be set to shutdown after a certain amount of time, at which point a supervisor can restart them. Workers also automatically exit on on hard timeouts to prevent an inconsistent process state.

  • Unique queues

    TaskTiger has the option to avoid duplicate tasks in the task queue. In some cases it is desirable to combine multiple similar tasks. For example, imagine a task that indexes objects (e.g. to make them searchable). If an object is already present in the task queue and hasn't been processed yet, a unique queue will ensure that the indexing task doesn't have to do duplicate work. However, if the task is already running while it's queued, the task will be executed another time to ensure that the indexing task always picks up the latest state.

  • Task locks

    TaskTiger can ensure to never execute more than one instance of tasks with similar arguments by acquiring a lock. If a task hits a lock, it is requeued and scheduled for later executions after a configurable interval.

  • Task retrying

    TaskTiger lets you retry exceptions (all exceptions or a list of specific ones) and comes with configurable retry intervals (fixed, linear, exponential, custom).

  • Flexible queues

    Tasks can be easily queued in separate queues. Workers pick tasks from a randomly chosen queue and can be configured to only process specific queues, ensuring that all queues are processed equally. TaskTiger also supports subqueues which are separated by a period. For example, you can have per-customer queues in the form process_emails.CUSTOMER_ID and start a worker to process process_emails and any of its subqueues. Since tasks are picked from a random queue, all customers get equal treatment: If one customer is queueing many tasks it can't block other customers' tasks from being processed. A maximum queue size can also be enforced.

  • Batch queues

    Batch queues can be used to combine multiple queued tasks into one. That way, your task function can process multiple sets of arguments at the same time, which can improve performance. The batch size is configurable.

  • Scheduled and periodic tasks

    Tasks can be scheduled for execution at a specific time. Tasks can also be executed periodically (e.g. every five seconds).

  • Structured logging

    TaskTiger supports JSON-style logging via structlog, allowing more flexibility for tools to analyze the log. For example, you can use TaskTiger together with Logstash, Elasticsearch, and Kibana.

    The structlog processor tasktiger.logging.tasktiger_processor can be used to inject the current task id into all log messages.

  • Reliability

    TaskTiger atomically moves tasks between queue states, and will re-execute tasks after a timeout if a worker crashes.

  • Error handling

    If an exception occurs during task execution and the task is not set up to be retried, TaskTiger stores the execution tracebacks in an error queue. The task can then be retried or deleted manually. TaskTiger can be easily integrated with error reporting services like Rollbar.

  • Admin interface

    A simple admin interface using flask-admin exists as a separate project (tasktiger-admin).

It is easy to get started with TaskTiger.

Create a file that contains the task(s).

# tasks.py
def my_task():
    print('Hello')

Queue the task using the delay method.

In [1]: import tasktiger, tasks
In [2]: tiger = tasktiger.TaskTiger()
In [3]: tiger.delay(tasks.my_task)

Run a worker (make sure the task code can be found, e.g. using PYTHONPATH).

% PYTHONPATH=. tasktiger
{"timestamp": "2015-08-27T21:00:09.135344Z", "queues": null, "pid": 69840, "event": "ready", "level": "info"}
{"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.727051Z", "pid": 69840, "queue": "default", "child_pid": 70171, "event": "processing"}
Hello
{"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.732457Z", "pid": 69840, "queue": "default", "event": "done"}

A TaskTiger object keeps track of TaskTiger's settings and is used to decorate and queue tasks. The constructor takes the following arguments:

  • connection

    Redis connection object. The connection should be initialized with decode_responses=True to avoid encoding problems on Python 3.

  • config

    Dict with config options. Most configuration options don't need to be changed, and a full list can be seen within TaskTiger's __init__ method.

    Here are a few commonly used options:

    • ALWAYS_EAGER

      If set to True, all tasks except future tasks (when is a future time) will be executed locally by blocking until the task returns. This is useful for testing purposes.

    • BATCH_QUEUES

      Set up queues that will be processed in batch, i.e. multiple jobs are taken out of the queue at the same time and passed as a list to the worker method. Takes a dict where the key represents the queue name and the value represents the batch size. Note that the task needs to be declared as batch=True. Also note that any subqueues will be automatically treated as batch queues, and the batch value of the most specific subqueue name takes precedence.

    • ONLY_QUEUES

      If set to a non-empty list of queue names, a worker only processes the given queues (and their subqueues), unless explicit queues are passed to the command line.

  • setup_structlog

    If set to True, sets up structured logging using structlog when initializing TaskTiger. This makes writing custom worker scripts easier since it doesn't require the user to set up structlog in advance.

Example:

import tasktiger
from redis import Redis
conn = Redis(db=1, decode_responses=True)
tiger = tasktiger.TaskTiger(connection=conn, config={
    'BATCH_QUEUES': {
        # Batch up to 50 tasks that are queued in the my_batch_queue or any
        # of its subqueues, except for the send_email subqueue which only
        # processes up to 10 tasks at a time.
        'my_batch_queue': 50,
        'my_batch_queue.send_email': 10,
    },
})

TaskTiger provides a task decorator to specify task options. Note that simple tasks don't need to be decorated. However, decorating the task allows you to use an alternative syntax to queue the task, which is compatible with Celery:

# tasks.py

import tasktiger
tiger = tasktiger.TaskTiger()

@tiger.task()
def my_task(name, n=None):
    print('Hello', name)
In [1]: import tasks
# The following are equivalent. However, the second syntax can only be used
# if the task is decorated.
In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1})
In [3]: tasks.my_task.delay('John', n=1)

Tasks support a variety of options that can be specified either in the task decorator, or when queueing a task. For the latter, the delay method must be called on the TaskTiger object, and any options in the task decorator are overridden.

@tiger.task(queue='myqueue', unique=True)
def my_task():
    print('Hello')
# The task will be queued in "otherqueue", even though the task decorator
# says "myqueue".
tiger.delay(my_task, queue='otherqueue')

When queueing a task, the task needs to be defined in a module other than the Python file which is being executed. In other words, the task can't be in the __main__ module. TaskTiger will give you back an error otherwise.

The following options are supported by both delay and the task decorator:

  • queue

    Name of the queue where the task will be queued.

  • hard_timeout

    If the task runs longer than the given number of seconds, it will be killed and marked as failed.

  • unique

    Boolean to indicate whether the task will only be queued if there is no similar task with the same function, arguments, and keyword arguments in the queue. Note that multiple similar tasks may still be executed at the same time since the task will still be inserted into the queue if another one is being processed. Requeueing an already scheduled unique task will not change the time it was originally scheduled to execute at.

  • unique_key

    If set, this implies unique=True and specifies the list of kwargs to use to construct the unique key. By default, all args and kwargs are serialized and hashed.

  • lock

    Boolean to indicate whether to hold a lock while the task is being executed (for the given args and kwargs). If a task with similar args/kwargs is queued and tries to acquire the lock, it will be retried later.

  • lock_key

    If set, this implies lock=True and specifies the list of kwargs to use to construct the lock key. By default, all args and kwargs are serialized and hashed.

  • max_queue_size

    A maximum queue size can be enforced by setting this to an integer value. The QueueFullException exception will be raised when queuing a task if this limit is reached. Tasks in the active, scheduled, and queued states are counted against this limit.

  • when

    Takes either a datetime (for an absolute date) or a timedelta (relative to now). If given, the task will be scheduled for the given time.

  • retry

    Boolean to indicate whether to retry the task when it fails (either because of an exception or because of a timeout). To restrict the list of failures, use retry_on. Unless retry_method is given, the configured DEFAULT_RETRY_METHOD is used.

  • retry_on

    If a list is given, it implies retry=True. The task will be only retried on the given exceptions (or its subclasses). To retry the task when a hard timeout occurs, use JobTimeoutException.

  • retry_method

    If given, implies retry=True. Pass either:

    • a function that takes the retry number as an argument, or,
    • a tuple (f, args), where f takes the retry number as the first argument, followed by the additional args.

    The function needs to return the desired retry interval in seconds, or raise StopRetry to stop retrying. The following built-in functions can be passed for common scenarios and return the appropriate tuple:

    • fixed(delay, max_retries)

      Returns a method that returns the given delay (in seconds) or raises StopRetry if the number of retries exceeds max_retries.

    • linear(delay, increment, max_retries)

      Like fixed, but starts off with the given delay and increments it by the given increment after every retry.

    • exponential(delay, factor, max_retries)

      Like fixed, but starts off with the given delay and multiplies it by the given factor after every retry.

    For example, to retry a task 3 times (for a total of 4 executions), and wait 60 seconds between executions, pass retry_method=fixed(60, 3).

  • runner_class

    If given, a Python class can be specified to influence task running behavior. The runner class should inherit tasktiger.runner.BaseRunner and implement the task execution behavior. The default implementation is available in tasktiger.runner.DefaultRunner. The following behavior can be achieved:

    • Execute specific code before or after the task is executed (in the forked child process), or customize the way task functions are called in either single or batch processing.

      Note that if you want to execute specific code for all tasks, you should use the CHILD_CONTEXT_MANAGERS configuration option.

    • Control the hard timeout behavior of a task.

    • Execute specific code in the main worker process after a task failed permanently.

    This is an advanced feature and the interface and requirements of the runner class can change in future TaskTiger versions.

The following options can be only specified in the task decorator:

  • batch

    If set to True, the task will receive a list of dicts with args and kwargs and can process multiple tasks of the same type at once. Example: [{"args": [1], "kwargs": {}}, {"args": [2], "kwargs": {}}] Note that the list will only contain multiple items if the worker has set up BATCH_QUEUES for the specific queue (see the Configuration section).

  • schedule

    If given, makes a task execute periodically. Pass either:

    • a function that takes the current datetime as an argument.
    • a tuple (f, args), where f takes the current datetime as the first argument, followed by the additional args.

    The schedule function must return the next task execution datetime, or None to prevent periodic execution. The function is executed to determine the initial task execution date when a worker is initialized, and to determine the next execution date when the task is about to get executed.

    For most common scenarios, the below mentioned built-in functions can be passed:

    • periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None)

      Use equal, periodic intervals, starting from start_date (defaults to 2000-01-01T00:00Z, a Saturday, if not given), ending at end_date (or never, if not given). For example, to run a task every five minutes indefinitely, use schedule=periodic(minutes=5). To run a task every every Sunday at 4am UTC, you could use schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4)).

    • cron_expr(expr, start_date=None, end_date=None)

      start_date, to specify the periodic task start date. It defaults to 2000-01-01T00:00Z, a Saturday, if not given. end_date, to specify the periodic task end date. The task repeats forever if end_date is not given. For example, to run a task every hour indefinitely, use schedule=cron_expr("0 * * * *"). To run a task every Sunday at 4am UTC, you could use schedule=cron_expr("0 4 * * 0").

In some cases the task retry options may not be flexible enough. For example, you might want to use a different retry method depending on the exception type, or you might want to like to suppress logging an error if a task fails after retries. In these cases, RetryException can be raised within the task function. The following options are supported:

  • method

    Specify a custom retry method for this retry. If not given, the task's default retry method is used, or, if unspecified, the configured DEFAULT_RETRY_METHOD. Note that the number of retries passed to the retry method is always the total number of times this method has been executed, regardless of which retry method was used.

  • original_traceback

    If RetryException is raised from within an except block and original_traceback is True, the original traceback will be logged (i.e. the stacktrace at the place where the caught exception was raised). False by default.

  • log_error

    If set to False and the task fails permanently, a warning will be logged instead of an error, and the task will be removed from Redis when it completes. True by default.

Example usage:

from tasktiger.exceptions import RetryException
from tasktiger.retry import exponential, fixed

def my_task():
    if not ready():
        # Retry every minute up to 3 times if we're not ready. An error will
        # be logged if we're out of retries.
        raise RetryException(method=fixed(60, 3))

    try:
        some_code()
    except NetworkException:
        # Back off exponentially up to 5 times in case of a network failure.
        # Log the original traceback (as a warning) and don't log an error if
        # we still fail after 5 times.
        raise RetryException(method=exponential(60, 2, 5),
                             original_traceback=True,
                             log_error=False)

The tasktiger command is used on the command line to invoke a worker. To invoke multiple workers, multiple instances need to be started. This can be easily done e.g. via Supervisor. The following Supervisor configuration file can be placed in /etc/supervisor/tasktiger.ini and runs 4 TaskTiger workers as the ubuntu user. For more information, read Supervisor's documentation.

[program:tasktiger]
command=/usr/local/bin/tasktiger
process_name=%(program_name)s_%(process_num)02d
numprocs=4
numprocs_start=0
priority=999
autostart=true
autorestart=true
startsecs=10
startretries=3
exitcodes=0,2
stopsignal=TERM
stopwaitsecs=600
killasgroup=false
user=ubuntu
redirect_stderr=false
stdout_logfile=/var/log/tasktiger.out.log
stdout_logfile_maxbytes=250MB
stdout_logfile_backups=10
stderr_logfile=/var/log/tasktiger.err.log
stderr_logfile_maxbytes=250MB
stderr_logfile_backups=10

Workers support the following options:

  • -q, --queues

    If specified, only the given queue(s) are processed. Multiple queues can be separated by comma. Any subqueues of the given queues will be also processed. For example, -q first,second will process items from first, second, and subqueues such as first.CUSTOMER1, first.CUSTOMER2.

  • -e, --exclude-queues

    If specified, exclude the given queue(s) from processing. Multiple queues can be separated by comma. Any subqueues of the given queues will also be excluded unless a more specific queue is specified with the -q option. For example, -q email,email.incoming.CUSTOMER1 -e email.incoming will process items from the email queue and subqueues like email.outgoing.CUSTOMER1 or email.incoming.CUSTOMER1, but not email.incoming or email.incoming.CUSTOMER2.

  • -m, --module

    Module(s) to import when launching the worker. This improves task performance since the module doesn't have to be reimported every time a task is forked. Multiple modules can be separated by comma.

    Another way to preload modules is to set up a custom TaskTiger launch script, which is described below.

  • -h, --host

    Redis server hostname (if different from localhost).

  • -p, --port

    Redis server port (if different from 6379).

  • -a, --password

    Redis server password (if required).

  • -n, --db

    Redis server database number (if different from 0).

  • -M, --max-workers-per-queue

    Maximum number of workers that are allowed to process a queue.

  • --store-tracebacks/--no-store-tracebacks

    Store tracebacks with execution history (config defaults to True).

  • --executor

    Can be fork (default) or sync. Whether to execute tasks in a separate process via fork, or execute them synchronously in the same proces. See "Features" section for the benefits of either approach.

  • --exit-after

    Exit the worker after the time in minutes has elapsed. This is mainly useful with the synchronous executor to prevent memory leaks from accumulating.

In some cases it is convenient to have a custom TaskTiger launch script. For example, your application may have a manage.py command that sets up the environment and you may want to launch TaskTiger workers using that script. To do that, you can use the run_worker_with_args method, which launches a TaskTiger worker and parses any command line arguments. Here is an example:

import sys
from tasktiger import TaskTiger

try:
    command = sys.argv[1]
except IndexError:
    command = None

if command == 'tasktiger':
    tiger = TaskTiger(setup_structlog=True)
    # Strip the "tasktiger" arg when running via manage, so we can run e.g.
    # ./manage.py tasktiger --help
    tiger.run_worker_with_args(sys.argv[2:])
    sys.exit(0)

TaskTiger provides access to the Task class which lets you inspect queues and perform various actions on tasks.

Each queue can have tasks in the following states:

  • queued: Tasks that are queued and waiting to be picked up by the workers.
  • active: Tasks that are currently being processed by the workers.
  • scheduled: Tasks that are scheduled for later execution.
  • error: Tasks that failed with an error.

To get a list of all tasks for a given queue and state, use Task.tasks_from_queue. The method gives you back a tuple containing the total number of tasks in the queue (useful if the tasks are truncated) and a list of tasks in the queue, latest first. Using the skip and limit keyword arguments, you can fetch arbitrary slices of the queue. If you know the task ID, you can fetch a given task using Task.from_id. Both methods let you load tracebacks from failed task executions using the load_executions keyword argument, which accepts an integer indicating how many executions should be loaded.

Tasks can also be constructed and queued using the regular constructor, which takes the TaskTiger instance, the function name and the options described in the Task options section. The task can then be queued using its delay method. Note that the when argument needs to be passed to the delay method, if applicable. Unique tasks can be reconstructed using the same arguments.

The Task object has the following properties:

  • id: The task ID.
  • data: The raw data as a dict from Redis.
  • executions: A list of failed task executions (as dicts). An execution dict contains the processing time in time_started and time_failed, the worker host in host, the exception name in exception_name and the full traceback in traceback.
  • serialized_func, args, kwargs: The serialized function name with all of its arguments.
  • func: The imported (executable) function

The Task object has the following methods:

  • cancel: Cancel a scheduled task.
  • delay: Queue the task for execution.
  • delete: Remove the task from the error queue.
  • execute: Run the task without queueing it.
  • n_executions: Queries and returns the number of past task executions.
  • retry: Requeue the task from the error queue for execution.
  • update_scheduled_time: Updates a scheduled task's date to the given date.

The current task can be accessed within the task function while it's being executed: In case of a non-batch task, the current_task property of the TaskTiger instance returns the current Task instance. In case of a batch task the current_tasks property must be used which returns a list of tasks that are currently being processed (in the same order as they were passed to the task).

Example 1: Queueing a unique task and canceling it without a reference to the original task.

from tasktiger import TaskTiger, Task

tiger = TaskTiger()

# Send an email in five minutes.
task = Task(tiger, send_mail, args=['email_id'], unique=True)
task.delay(when=datetime.timedelta(minutes=5))

# Unique tasks get back a task instance referring to the same task by simply
# creating the same task again.
task = Task(tiger, send_mail, args=['email_id'], unique=True)
task.cancel()

Example 2: Inspecting queues and retrying a task by ID.

from tasktiger import TaskTiger, Task

QUEUE_NAME = 'default'
TASK_STATE = 'error'
TASK_ID = '6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79'

tiger = TaskTiger()

n_total, tasks = Task.tasks_from_queue(tiger, QUEUE_NAME, TASK_STATE)

for task in tasks:
    print(task.id, task.func)

task = Task.from_id(tiger, QUEUE_NAME, TASK_STATE, TASK_ID)
task.retry()

Example 3: Accessing the task instances within a batch task function to determine how many times the currently processing tasks were previously executed.

from tasktiger import TaskTiger

tiger = TaskTiger()

@tiger.task(batch=True)
def my_task(args):
    for task in tiger.current_tasks:
        print(task.n_executions())

The --max-workers-per-queue option uses queue locks to control the number of workers that can simultaneously process the same queue. When using this option a system lock can be placed on a queue which will keep workers from processing tasks from that queue until it expires. Use the set_queue_system_lock() method of the TaskTiger object to set this lock.

TaskTiger comes with Rollbar integration for error handling. When a task errors out, it can be logged to Rollbar, grouped by queue, task function name and exception type. To enable logging, initialize rollbar with the StructlogRollbarHandler provided in the tasktiger.rollbar module. The handler takes a string as an argument which is used to prefix all the messages reported to Rollbar. Here is a custom worker launch script:

import logging
import rollbar
import sys
from tasktiger import TaskTiger
from tasktiger.rollbar import StructlogRollbarHandler

tiger = TaskTiger(setup_structlog=True)

rollbar.init(ROLLBAR_API_KEY, APPLICATION_ENVIRONMENT,
             allow_logging_basic_config=False)
rollbar_handler = StructlogRollbarHandler('TaskTiger')
rollbar_handler.setLevel(logging.ERROR)
tiger.log.addHandler(rollbar_handler)

tiger.run_worker_with_args(sys.argv[1:])

Error'd tasks occasionally need to be purged from Redis, so TaskTiger exposes a purge_errored_tasks method to help. It might be useful to set this up as a periodic task as follows:

from tasktiger import TaskTiger, periodic

tiger = TaskTiger()

@tiger.task(schedule=periodic(hours=1))
def purge_errored_tasks():
    tiger.purge_errored_tasks(
        limit=1000,
        last_execution_before=(
            datetime.datetime.utcnow() - datetime.timedelta(weeks=12)
        )
    )

Tests can be run locally using the provided docker compose file. After installing docker, tests should be runnable with:

docker-compose run --rm tasktiger pytest

Tests can be more granularly run using normal pytest flags. For example:

docker-compose run --rm tasktiger pytest tests/test_base.py::TestCase
  1. Make sure the code has been thoroughly reviewed and tested in a realistic production environment.
  2. Update setup.py and CHANGELOG.md. Make sure you include any breaking changes.
  3. Run python setup.py sdist and twine upload dist/<PACKAGE_TO_UPLOAD>.
  4. Push a new tag pointing to the released commit, format: v0.13 for example.
  5. Mark the tag as a release in GitHub's UI and include in the description the changelog entry for the version. An example would be: https://github.com/closeio/tasktiger/releases/tag/v0.13.

tasktiger's People

Contributors

alanhamlett avatar alecrosenbaum avatar caseywebdev avatar charliewolf avatar czbix avatar dawncold avatar dependabot[bot] avatar drewler avatar edwardbetts avatar felixxm avatar froxcz avatar harshcasper avatar jkemp101 avatar jpmelos avatar kapyshin avatar lucasvo avatar macobo avatar memuszr avatar neob91-close avatar nprescott avatar nsaje avatar philfreo avatar purificant avatar thomasst avatar timgates42 avatar tsx avatar vishesh10 avatar vtclose avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tasktiger's Issues

Optimize scheduled task queueing

When there are many queues, _worker_queue_scheduled_tasks is looping through all the scheduled queues after every worker run, which causes a lot of Redis overhead.

only a single task can execute before tasktiger errors out

Hi,

I'm having trouble getting even the basic hello world example running cleanly on my system. The task does execute, but I get a massive stack trace about an exception that occurred in the Lua code. After the exception is printed, the tasktiger process exits. Patching the module to add an exception handler in redis_scripts.py works just fine in that multiple tasks can execute without the main process exiting, but it seems kinda hacky and the console gets flooded with "event": "failing expired task" errors.

Here's my specs (Ubuntu 16.04):

$ pip freeze
...
redis==2.10.6
tasktiger==0.9.1
$ redis-server -v
Redis server v=3.0.6 sha=00000000:0 malloc=jemalloc-3.6.0 bits=64 build=687a2a319020fa42

My code:

# queuer.py
from tasky import my_task
import tasktiger

tiger = tasktiger.TaskTiger()
tiger.delay(my_task)

# tasky.py
def my_task():
    print "HELLO WORLD\ngoodbye"

The error:

$ PYTHONPATH=. tasktiger
{"level": "info", "timestamp": "2017-09-04T00:20:50.817296Z", "queues": [], "pid": 16069, "exclude_queues": [], "event": "ready"}
{"level": "debug", "timestamp": "2017-09-04T00:20:54.226146Z", "pid": 16069, "queue": "default", "params": [{"args": [], "task_id": "ac2ebf92129241a44ea8a91d667cda3cf4d948e50f8ddd45daf638ebb277cab5", "kwargs": {}}], "child_pid": 16076, "func": "tasky:my_task", "event": "processing"}
HELLO WORLD
goodbye
{"exception": "Traceback (most recent call last):\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py\", line 800, in run\n    self._worker_run()\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py\", line 712, in _worker_run\n    if not self._process_from_queue(queue):\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py\", line 519, in _process_from_queue\n    self._finish_task_processing(queue, task, success)\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py\", line 610, in _finish_task_processing\n    _mark_done()\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py\", line 606, in _mark_done\n    task._move(from_state=ACTIVE)\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/task.py\", line 240, in _move\n    scripts.execute_pipeline(pipeline)\n  File \"/home/ancat/.local/lib/python2.7/site-packages/tasktiger/redis_scripts.py\", line 495, in execute_pipeline\n    raise e\nResponseError: Error running script (call to f_8c9e36483ae952c86a059230793221dd84b0e9d0): @enable_strict_lua:15: user_script:46: Script attempted to access unexisting global variable 'f_da73dc442ac4ec1343b0cd6eb3af5afe8204adba' ", "pid": 16069, "timestamp": "2017-09-04T00:20:54.237918Z", "level": "error"}
{"timestamp": "2017-09-04T00:20:54.240324Z", "pid": 16069, "event": "done", "level": "info"}
Traceback (most recent call last):
  File "/home/ancat/.local/bin/tasktiger", line 11, in <module>
    sys.exit(run_worker())
  File "/home/ancat/.local/lib/python2.7/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/ancat/.local/lib/python2.7/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/ancat/.local/lib/python2.7/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ancat/.local/lib/python2.7/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/ancat/.local/lib/python2.7/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/__init__.py", line 366, in run_worker
    tiger.run_worker(**kwargs)
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/__init__.py", line 296, in run_worker
    worker.run()
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py", line 800, in run
    self._worker_run()
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py", line 712, in _worker_run
    if not self._process_from_queue(queue):
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py", line 519, in _process_from_queue
    self._finish_task_processing(queue, task, success)
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py", line 610, in _finish_task_processing
    _mark_done()
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/worker.py", line 606, in _mark_done
    task._move(from_state=ACTIVE)
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/task.py", line 240, in _move
    scripts.execute_pipeline(pipeline)
  File "/home/ancat/.local/lib/python2.7/site-packages/tasktiger/redis_scripts.py", line 495, in execute_pipeline
    raise e
redis.exceptions.ResponseError: Error running script (call to f_8c9e36483ae952c86a059230793221dd84b0e9d0): @enable_strict_lua:15: user_script:46: Script attempted to access unexisting global variable 'f_da73dc442ac4ec1343b0cd6eb3af5afe8204adba'

Tasks stuck in active state for days

I've noticed some tasks are getting stuck in state active.

>>> count, tasks = Task.tasks_from_queue(tiger, queue + '.' + subqueue, 'active')
>>> count
1
>>> tasks
[<Task <function archive_data_for_user_and_month at 0x7fb18d5c8598>>]
>>> tasks[0]._data.get('unique')
True
>>> tasks[0]._data.get('time_last_queued')
1532891849.573912
>>> tasks[0]._ts
datetime.datetime(2018, 7, 30, 2, 37, 2, 150757)
>>> datetime.utcnow()
datetime.datetime(2018, 8, 2, 7, 55, 27, 503967)

Any idea as to a cause, and should I periodically look for and retry these tasks or should this be fixed inside tasktiger?

Docs: options are unclear

Options are unclear because:

  • the heading The following options are supported for delay: seems to apply to more than just delay
  • some options (e.g. unique, lock) don't indicate what data type (e.g. boolean) is expected

setup_structlog = True breaking TaskTiger initialization

Setting setup_structlog to True when initializing TaskTiger in my manage.py is breaking in TaskTiger 0.9.5, ex: tiger = TaskTiger(setup_structlog=True)

The root cause seems to be line 201 in init.py where it's saying the logging module doesn't have a DEBUG attribute "AttributeError: 'module' object has no attribute 'DEBUG'"
if setup_structlog:
self.log.setLevel(logging.DEBUG)

Below is my log output:

06:55:48 tiger_worker2.1 | Traceback (most recent call last):
06:55:48 tiger_worker2.1 | File "manage.py", line 23, in
06:55:48 worker.1 | Traceback (most recent call last):
06:55:48 tiger_worker2.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 worker.1 | File "manage.py", line 23, in
06:55:48 tiger_worker2.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 web.1 | Traceback (most recent call last):
06:55:48 tiger_worker3.1 | Traceback (most recent call last):
06:55:48 cleanse_worker.1 | Traceback (most recent call last):
06:55:48 worker.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 tiger_worker1.1 | Traceback (most recent call last):
06:55:48 tiger_worker2.1 | self.log.setLevel(logging.DEBUG)
06:55:48 web.1 | File "manage.py", line 23, in
06:55:48 tiger_worker3.1 | File "manage.py", line 23, in
06:55:48 cleanse_worker.1 | File "manage.py", line 23, in
06:55:48 worker.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 tiger_worker1.1 | File "manage.py", line 23, in
06:55:48 tiger_worker2.1 | AttributeError: 'module' object has no attribute 'DEBUG'
06:55:48 web.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 tiger_worker3.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 cleanse_worker.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 worker.1 | self.log.setLevel(logging.DEBUG)
06:55:48 tiger_worker1.1 | tiger = TaskTiger(setup_structlog=True)
06:55:48 tiger_worker2.1 | DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): sentry.io:443
06:55:48 web.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 tiger_worker3.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 cleanse_worker.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 worker.1 | AttributeError: 'module' object has no attribute 'DEBUG'
06:55:48 tiger_worker1.1 | File "/Users/uniacid/Sites/Xivg/cleanse.studio/env/lib/python2.7/site-packages/tasktiger/init.py", line 201, in init
06:55:48 web.1 | self.log.setLevel(logging.DEBUG)
06:55:48 tiger_worker3.1 | self.log.setLevel(logging.DEBUG)
06:55:48 cleanse_worker.1 | self.log.setLevel(logging.DEBUG)
06:55:48 worker.1 | DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): sentry.io:443
06:55:48 tiger_worker1.1 | self.log.setLevel(logging.DEBUG)
06:55:48 web.1 | AttributeError: 'module' object has no attribute 'DEBUG'
06:55:48 tiger_worker3.1 | AttributeError: 'module' object has no attribute 'DEBUG'
06:55:48 cleanse_worker.1 | AttributeError: 'module' object has no attribute 'DEBUG'
06:55:48 tiger_worker1.1 | AttributeError: 'module' object has no attribute 'DEBUG'

Add limit for maximum number of active workers per queue

Replace existing single worker queue setting with a new setting that allows the maximum number of workers per queue. Setting this value to 1 would duplicate the existing single worker queue functionality.

Most likely depends on #93 to be completed first. Should also consider if a token bucket implementation would be better than semaphore locking.

Add functionality to cleanup errored tasks

Failed tasks can be left in the ERROR state and must be manually removed. There should be an easy way to purge old errored tasks.

  • Add method purge_errored_tasks(queues=None, exclude_queues=None, last_execution_before=None) to the TaskTiger class
    • queues and exclude_queues should function similar to the way the Worker class processes them, using config values if they aren't specified in the function call
    • last_execution_before seconds since the epoch that can be used to only delete errored tasks older than the specified age OR we could specify age based on last execution in seconds (e.g. tasks last executed more than 12 weeks ago = 7,257,600 seconds)
  • Once tasks are identified their delete() method can be called to remove them.
  • Might want to include a way to slow down the deletes in case there are millions to process and the caller doesn't want to impact Redis

Log task's execution time

Right now it's very hard to scan the logs and determine how much time was spent executing a given task.

Add duration to these log statements. I think now() - task.ts should be close enough since _ts is set here. Might need a little investigation.

Option to set task expiry time

Defines an optional expiry time, the task should not be executed after the expiry time. When a worker receives an expired task it will ignore the task or mark the task state as EXPIRED.

Our scenerio: We need send coupon expriation notification to our customers via E-mail, SMS, etc. The worker should never send the notification when the coupon has already been expired.

New install doesn't pull requirements

$ pip install tasktiger
Collecting tasktiger
Installing collected packages: tasktiger
Successfully installed tasktiger-0.2

$ tasktiger
Traceback (most recent call last):
  File "/Users/nolan/.virtualenvs/tt2/bin/tasktiger", line 7, in <module>
    from tasktiger import run_worker
  File "/Users/nolan/.virtualenvs/tt2/lib/python2.7/site-packages/tasktiger/__init__.py", line 2, in <module>
    import click
ImportError: No module named click

The same occurs after cloning the repository and running python setup.py install. It appears setup.py is missing an install_requires directive for those libraries in requirements.txt.

Periodic tasks not working

I can't seem to get the periodic tasks to execute, or show up in the 'scheduled' queue.

Using the following tasks.py for debug:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
from tasktiger import TaskTiger
from tasktiger.schedule import periodic

tiger = TaskTiger()


@tiger.task(schedule=periodic(seconds=1), queue='periodic')
def periodic_task():
    """Periodic task."""
    conn = redis.Redis(decode_responses=True)
    conn.incr('period_count', 1)


@tiger.task()
def test_task():
    """Minimal task."""
    print('Hello')

Running tasktiger as shown in docs:

$ PYTHONPATH=. tasktiger

I can run the minimal test_task:

>>> import tasks
>>> tasks.test_task.delay()
<Task <function test_task at 0x7f03aaacfd08>>

I see the output in the log, but there is no evidence of the periodic_task and the redis key 'period_count' never increments:

{"pid": 27295, "queues": [], "exclude_queues": [], "single_worker_queues": [], "event": "ready", "level": "info", "timestamp": "2017-10-26T12:43:35.236665Z"}
{"pid": 27295, "queue": "default", "event": "new queue", "level": "debug", "timestamp": "2017-10-26T12:44:09.818470Z"}
{"pid": 27295, "queue": "default", "src_queue": "queued", "dest_queue": "active", "qty": 1, "event": "moved tasks", "level": "debug", "timestamp": "2017-10-26T12:44:09.819402Z"}
{"pid": 27295, "queue": "default", "child_pid": 27318, "func": "tasks:test_task", "task_id": "c44b198caac44250d06fc5dfc3249960d9eb5bfbe76d8bb848ddb419c8382323", "params": {"args": [], "kwargs": {}}, "event": "processing", "level": "info", "timestamp": "2017-10-26T12:44:09.820179Z"}
Hello
{"pid": 27295, "queue": "default", "attempted": 1, "processed": 1, "event": "processed", "level": "debug", "timestamp": "2017-10-26T12:44:09.823499Z"}
{"pid": 27295, "queue": "default", "task_id": "c44b198caac44250d06fc5dfc3249960d9eb5bfbe76d8bb848ddb419c8382323", "event": "done", "level": "info", "timestamp": "2017-10-26T12:44:09.825171Z"}
{"pid": 27295, "queue": "default", "src_queue": "queued", "dest_queue": "active", "qty": 0, "event": "moved tasks", "level": "debug", "timestamp": "2017-10-26T12:44:09.825592Z"}
{"pid": 27295, "time_total": 60.00032615661621, "time_busy": 0.003787517547607422, "utilization": 0.006312494931645924, "event": "stats", "level": "info", "timestamp": "2017-10-26T12:44:35.237246Z"}

Am I missing something?

By the way, TaskTiger is a great project!

LICENSE file missing

While setup.py notes that the package is released under the MIT license, an actual LICENSE file is missing. It would help a lot to convince others to use it if you'd add one.

Pre/post task execution hooks?

Does tasktiger have anything analogous to Celery's task_prerun and task_postrun signals? See: http://docs.celeryproject.org/en/latest/userguide/signals.html#task-prerun

I work on a multi-tenant application that needs to be able to switch databases whenever an asynchronous task runs. In order to do this in Celery, we place a piece of metadata into the task's kwargs when it is queued, and the pop it off in a handler function when task_prerun is fired.

Assuming this isn't currently available, I think I have an idea how to add a simple version. Let me know if this is a feature you'd like to see.

Worker concurrency

Currently, a TaskTiger worker can process only one task concurrently. Is it possible to have one worker prefork multiple pool subprocesses so that it could process multiple tasks at same time?

In our case, we need preload some big modules(long time to load and a significant amount of memory usage) in parent process for performance. Starting multiple workers consumes too much memory, and this problem could be fixed if TaskTiger support worker concurrency.

StrictRedis causes redis.exceptions.ResponseError

I'm using django-rq and, by default, it uses StrictRedis, which has a different, more correct API than the legacy Redis client class.

I can get around this by changing the REDIS_CLIENT_CLASS config in my django app.

'OPTIONS': {
  "REDIS_CLIENT_CLASS": "redis.client.Redis",
  'CONNECTION_POOL_KWARGS': {
    "decode_responses": True,
  }
},

However, if I use the StrictRedis client, I get the following error whenever a task is moved from ACTIVE to ERROR.

redis.exceptions.ResponseError: Command # 2 (ZADD t:error:default c0d4d2806d10037e0140b18ca3e8ec3f53f1e81812334ce833a4030cfec3bf69 1492030805.5607371) of

You can reproduce by using StrictRedis and forcing any task to fail.

For reference, this is similar to a this rq issue: rq/rq-scheduler#16.

I'm happy to submit a PR if this is something you want to support. Thanks for the great project!

Add functionality to purge periodic task execution histories

#137 addresses purging tasks in an error state. But long running periodic tasks can have an ever increasing list of execution errors that will never get purged. A function should be provided that can be used to trim the execution list for periodic tasks so that they don't have unbounded growth.

Another idea would be to have a max executions config setting and do a LTRIM every time we add to the execution list here:

self.connection.rpush(
self._key('task', task.id, 'executions'),
serialized_execution,
)

Examples

Could we get some examples? If not in an examples directory then in the documentation. For example, I'm having a hell of a time getting lock_key to actually work the way I think it is suppose to work. I have a number of things to process and I want to make sure that I don't process any with the same email address at the same time (so they don't stomp on each other).

TypeError when loading task json data

Traceback (most recent call last):
  File "./test.py", line 4, in purge_tasktiger_error_queues
    count, tasks = Task.tasks_from_queue(tiger, queue, ERROR)
  File "./venv/lib/python3.5/site-packages/tasktiger/task.py", line 393, in tasks_from_queue
    data = json.loads(serialized_data)
  File "/usr/lib/python3.5/json/__init__.py", line 312, in loads
    s.__class__.__name__))
TypeError: the JSON object must be str, not 'NoneType'

Improve Redis Lock

There are a number of improvements that can be made to the existing redis_lock.Lock.

  • Support a Semaphore locking construct to allow orchestrating multiple workers (added redis_semaphore.py)
  • Renewing fails for non-lock owners
  • Include lock ID or other metadata to identify who holds the lock
  • Don't leave orphaned keys #53

Update: We most likely just need to switch to the Lock provided in redis-py

Rewrite Task._move in Lua

scripts.fail_if_not_in_zset doesn't actually abort the transaction as I expected, we therefore need to rewrite the entire Task._move function in Lua to prevent consistency issues when moving tasks between states.

Move Documentation to readthedocs

Right now all the documentation for this library is in README.md. It's getting pretty unwieldy at this point, and should really probably be moved into a hosted page somewhere.

I've used readthedocs.org before, and it works pretty well. We'll have to refactor the README documentation into sphinx documentation first, but that shouldn't be too tricky.

Tasks:

  • refactor documentation into a docs/ folder, using sphinx
  • enable/connect readthedocs
  • update README.md
    • to point to the hosted documentation
    • to only have a feature overview and basic quickstart
  • (maybe) update repo description with the docs link

Doesn't work with Redis 3.2.1

redis.exceptions.ResponseError: Error running script (call to f_cba34df333464da2eaf6e07f32e9d467732a718f): @user_script:69: @user_script: 69: Lua redis() command arguments must be strings or integers 

Queue metrics

For basic monitoring, it would be nice to have helper methods for:

  • list of queue names with tasks in last X time
  • sums of queued, active, scheduled, and error totals per queue

Task.tasks_from_queue has a limit of 1000 and does unnecessary work since it returns a list of tasks.

Is there a way to collect metrics about the total number of tasks per redis queue/sortedset another way independent of tasktiger?

Related to #20.

Enforce `hard_timeout` if child process hangs

Right now we raise an exception in the child process when a "hard" timeout occurs. This exception can be intercepted in the task code, or a task could be stuck in an interruptible state, causing a task to run longer than specified or even forever.

Should we keep the current hard_timeout behavior, and simply send SIGTERM / SIGKILL to the child a few seconds after the timeout happens? Or should we have a soft_timeout setting that's separate from hard_timeout? I'd ideally like to make soft_timeout something that can be intercepted by the task code (e.g. "stop task if timeout exceeded", or have a code section that can't be interrupted by a soft timeout).

restart queue redis (appendonly yes) would enqueued some processed tasks again

I found some tasks which are done successfully are enqueued again.

I have a transactional sms queue, send sms via another platform like Twilio, I found one sms task is done successfully (found sms record on sms platform), and there is no failed tasks in queue.

Every night we will backup database and redis aof files, during backup, we will shutdown database and redis, but we don't shutdown tasktiger worker processes, so it will raise exception of redis connection and it will be restarted by supervisord, after restart tasktiger several times, some sms tasks are enqueued again even they have been processed successfully.

I have tested, if I only restart my redis-server, some non-existing tasks will be enqueued as if my redis is rollback.


Today, we think we have found root cause of this issue.

Tasktiger use some lua scripts for utilizing lua language features, such as for, function. The customized pipeline script calls other lua scripts, that will be recorded in AOF file like this:

execute pipeline script with some sub-scripts: script A SHA1 and args for A, script B SHA1 and args for B, ...

BGWRITEAOF will remove SCRIPT LOAD, but you can still call these scripts via EVALSHA before restart redis or SCRIPT FLUSH.

When redis loading data from AOF file, pipeline script using EVALSHA will be failed because scripts are not loaded, so some tasks are not removed from active queue, worker will move them to queued and process them again.

We have to BGREWRITEAOF every hour and rewrite AOF before shutdown redis.

Set expiration on redis Lock keys

A redis expiration of lock expiration + a troubleshooting window (i.e. 1 day) should be set on each lock object when it is created and renewed. This will ensure lock objects are not left orphaned in the Redis store.

lua directory doesn't pack on pypi

I installed tasktiger and tasktiger-admin via pip, when I run tasktiger-admin, it says:

IOError: [Errno 2] No such file or directory: '/home/dejavu/Projects/ljmall/env/lib/python2.7/site-packages/tasktiger/lua/execute_pipeline.lua'

I found there is no lua directory in tasktiger

Ensure clean shutdown of threads within tasks

Consider calling threading._shutdown() before os._exit(). We probably don't want to call sys.exit() because it would perform additional cleanup (e.g. calling atexit handlers twice).

Example for batch is not giving expected results

I have created a test function to test receiving batch tasks:

@tiger.task(batch=True, queue="test_queue")
def test_task(kwargs):
    print("-------")
    print(kwargs)
    print("--------")

For tiger configuration I have same for both sender and task definition:

tiger = tasktiger.TaskTiger(connection=conn, config={
    'BATCH_QUEUES': {
        'test_queue': 50
    },
})

When executing PYTHONPATH=. tasktiger it prints tasks one-by-one instead of 1 array with the list of args and kwargs.

Also it will be needed to add PYTHONPATH to the supervisor configuration also, which is not mentioned in "Documentation".

Thanks.

Refactor queue config settings

Right now a queue's configuration is spread across multiple keys in the config. We should move to something like this instead and stop using the separate settings ONLY_QUEUES, BATCH_QUEUES, SINGLE_WORKER_QUEUES/MAX_WORKERS_PER_QUEUE?

QUEUES = {
    'a': {
        max_workers: 5,
        batch: 50
    },
    'b': {}
}

The SINGLE_WORKER_QUEUES setting should be deprecated since MAX_WORKERS_PER_QUEUE=1 is equivalent. Max workers was added in #115.

Pip install is missing /lua/execute_pipeline.lua

There seems to be a build issue with pip. Tasktiger installed through pip install tasktiger doesn't contain lua folder with execute_pipeline.lua:

Traceback (most recent call last):
  File "app.py", line 29, in <module>
    tiger = tasktiger.TaskTiger()
  File "/Users/maxim/code/gs-tracking/.env/lib/python3.6/site-packages/tasktiger/__init__.py", line 148, in __init__
    self.scripts = RedisScripts(self.connection)
  File "/Users/maxim/code/gs-tracking/.env/lib/python3.6/site-packages/tasktiger/redis_scripts.py", line 298, in __init__
    self._execute_pipeline = self.register_script_from_file('lua/execute_pipeline.lua')
  File "/Users/maxim/code/gs-tracking/.env/lib/python3.6/site-packages/tasktiger/redis_scripts.py", line 302, in register_script_from_file
    os.path.realpath(__file__)), filename)).read())
FileNotFoundError: [Errno 2] No such file or directory: '/Users/maxim/code/gs-tracking/.env/lib/python3.6/site-packages/tasktiger/lua/execute_pipeline.lua'

I checked the tar file on https://pypi.python.org/pypi/tasktiger/0.8.5 and lua folder is missing.

Passing "schedule" as a keyword argument to delay()

Hey team! First of all great work on tasktiger, it rocks.

I have a need to be able to dynamically create tasks with different user defined schedules. As such, I cannot provide a schedule when decorating the task function, and instead need to do it when I call delay on the task. I see in the docs that schedule is not a supported keyword argument, and was thinking about working on a pull request to add support for this, but I assumed that this was an intentional design decision for some reason. Could you provide more information about why this is not available as a kwarg for the delay call?

Thanks!

Call atexit() handlers

Since tasktiger forks a subprocess per task, it should call atexit handlers so that applications can properly cleanup before exiting.

For example, right now if using Segment's analytics package (e.g. analytics.track(...)) within a TaskTiger task, you must remember to call analytics.flush(). https://segment.com/docs/sources/server/python/#how-do-i-flush-right-now-.

This however would be done automatically since Segment normally flushes on its own when atexit is called.

is there any plan to support the Windows platform ?

Hi,

In its current state (v0.10.1), this project is not compatible with the Windows platform due to the use of the fcntl module.

Is there any plan to make this project compatible with Windows ?

Also I guess the package descriptions should be fixed to reflect that fact. Currenty setup.py reads:

setup(
    name='tasktiger',
    version='0.10.1',
    ...
    platforms='any',
    ...
    classifiers=[
        'Operating System :: OS Independent',

Periodic tasks can get queued with different default args/kargs

When tasktiger starts and automatically schedules periodic tasks the task ID is the hash of:
{"args": null, "func": "my_function", "kwargs": null}

If the same task is scheduled in a Python shell with my_function.delay() the hash is of:
{"args": [], "func": "my_function", "kwargs": {}}
which will be a different ID than the previous version. This can cause the same periodic task to be scheduled twice with two different IDs.

[] and {} are probably the more sane defaults. Two options to fix:

  • Force the hash to be None for [] and {} in gen_unique_id
  • Calculate the hash using [] and {} and have a migration to unschedule existing tasks that can be used before deploying the new version of TaskTiger so that two sets of periodic tasks don't end up scheduled.

TaskImportError in quickstart example

I'm following the Quickstart example described in the README and I'm getting TaskImportError:

tasks.py:

def create_event(data=None):
   print('Create event!')

In the terminal:

$ ipython
Python 3.5.2 (default, Aug 16 2016, 00:16:44)
IPython 5.1.0 -- An enhanced Interactive Python.

In [1]: import tasktiger, tasks

In [2]: from redis import StrictRedis

In [3]: redis = StrictRedis(host='localhost', port=6379, db=1)

In [4]: tiger = tasktiger.TaskTiger(connection=redis)

In [5]: tiger.delay(tasks.create_event)
Out[5]: <Task <function create_event at 0x10c7047b8>>

Worker output:

{"pid": 81586, "queue": "default", "exception_name": "tasktiger.exceptions.TaskImportError", "task_id": "4762d274dc73a4fd8b1153f98080eb2688eeae89ddd0a88ec972a6efe94d0be1", "func": "tasks.create_event", "traceback": "Traceback (most recent call last):\n File "/Users/mrosa/Projects/tasktiger/tasktiger/_internal.py", line 45, in import_attribute\n module = importlib.import_module(module_name)\n File "/Users/mrosa/.virtualenvs/api/lib/python3.5/importlib/init.py", line 126, in import_module\n return _bootstrap._gcd_import(name[level:], package, level)\n File "", line 986, in _gcd_import\n File "", line 969, in _find_and_load\n File "", line 956, in _find_and_load_unlocked\nImportError: No module named 'tasks'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/Users/mrosa/Projects/tasktiger/tasktiger/worker.py", line 206, in _execute_forked\n func = tasks[0].func\n File "/Users/mrosa/Projects/tasktiger/tasktiger/task.py", line 150, in func\n self._func = import_attribute(self.serialized_func)\n File "/Users/mrosa/Projects/tasktiger/tasktiger/_internal.py", line 48, in import_attribute\n raise TaskImportError(e)\ntasktiger.exceptions.TaskImportError: No module named 'tasks'\n", "timestamp": "2017-01-05T15:34:48.259495Z", "time_failed": 1483630488.242499, "level": "error"}

I'm running from the master branch with Python 3.5.2 in a virtualenv.

Unique tasks should be unique per queue

When removing a unique task from a queue, we check other states (e.g. scheduled / error / etc.) before removing the task object from Redis to make sure the task is not referenced anywhere else. However, we don't check queues with other names, we just check the current queue. I propose to include the queue name in the task ID hash to prevent issues with removing unique tasks. Alternatively, we'd have to check all other queues for tasks before removing a unique task, or have some kind of reference counter.

This is potentially a breaking change.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.