Giter Site home page Giter Site logo

long2ice / rearq Goto Github PK

View Code? Open in Web Editor NEW
148.0 7.0 9.0 2.93 MB

A distributed task queue built with asyncio and redis, with built-in web interface

Home Page: https://rearq.long2ice.io/rearq

License: Apache License 2.0

Python 58.59% Makefile 0.49% CSS 1.68% HTML 38.74% Dockerfile 0.50%
asyncio distributed arq task queue mysql postgresql redis rq

rearq's Introduction

ReArq

image image image image

Introduction

ReArq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

You can try Demo Online here.

Features

  • AsyncIO support, easy integration with FastAPI.
  • Delay task, cron task and async task support.
  • Full-featured build-in web interface.
  • Built-in distributed task lock to make same task only run one at the same time.
  • Other powerful features to be discovered.

Screenshots

dashboard worker task job result

Requirements

  • Redis >= 5.0

Install

Use MySQL backend:

pip install rearq[mysql]

Use PostgreSQL backend:

pip install rearq[postgres]

Quick Start

Task Definition

# main.py
from rearq import ReArq

rearq = ReArq(db_url='mysql://root:[email protected]:3306/rearq')


@rearq.on_shutdown
async def on_shutdown():
    # you can do some clean work here like close db and so on...
    print("shutdown")


@rearq.on_startup
async def on_startup():
    # you should do some initialization work here
    print("startup")
    # you must init Tortoise ORM here
    await Tortoise.init(
        db_url=settings.DB_URL,
        modules={"rearq": ["rearq.server.models"]},
    )


@rearq.task(queue="q1")
async def add(self, a, b):
    return a + b


@rearq.task(cron="*/5 * * * * * *")  # run task per 5 seconds
async def timer(self):
    return "timer"

Run rearq worker

> rearq main:rearq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO     | rearq.worker:_main:95 - Start worker success with queue: rearq:queue:default
2021-03-29 09:54:50.465 | INFO     | rearq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO     | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6

Run rearq timer

If you have timing task or delay task, you should run another command also:

> rearq main:rearq timer
2021-03-29 09:54:43.878 | INFO     | rearq.worker:_main:275 - Start timer success
2021-03-29 09:54:43.887 | INFO     | rearq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO     | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6

Also, you can run timer with worker together by rearq main:rearq worker -t.

Integration in FastAPI

from fastapi import FastAPI

app = FastAPI()


@app.on_event("startup")
async def startup() -> None:
    await Tortoise.init(
        db_url=settings.DB_URL,
        modules={"rearq": ["rearq.server.models"]},
    )


@app.on_event("shutdown")
async def shutdown() -> None:
    await rearq.close()


# then run task in view
@app.get("/test")
async def test():
    job = await add.delay(args=(1, 2))
    # or
    job = await add.delay(kwargs={"a": 1, "b": 2})
    # or
    job = await add.delay(1, 2)
    # or
    job = await add.delay(a=1, b=2)
    result = await job.result(timeout=5)  # wait result for 5 seconds
    print(result.result)
    return result

Start web interface

> rearq main:rearq server
Usage: rearq server [OPTIONS]

  Start rest api server.

Options:
  --host TEXT         Listen host.  [default: 0.0.0.0]
  -p, --port INTEGER  Listen port.  [default: 8000]
  -h, --help          Show this message and exit..

After server run, you can visit https://127.0.0.1:8000/docs to see all apis and https://127.0.0.1:8000 to see web interface.

Other options will pass into uvicorn directly, such as --root-path etc.

rearq main:rearq server --host 0.0.0.0 --root-path /rearq

Mount as FastAPI sub app

You can also mount rearq server as FastAPI sub app.

from fastapi import FastAPI

from examples.tasks import rearq
from rearq.server.app import app as rearq_app

app = FastAPI()

app.mount("/rearq", rearq_app)
rearq_app.set_rearq(rearq)

Start worker inside app

You can also start worker inside your app.

@app.on_event("startup")
async def startup():
    await rearq.init()
    await rearq_app.start_worker(with_timer=True, block=False)

ThanksTo

  • arq, Fast job queuing and RPC in python with asyncio and redis.

License

This project is licensed under the Apache-2.0 License.

rearq's People

Contributors

long2ice avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

rearq's Issues

An error will be reported regardless of the name used

An error will be reported regardless of the name used
-> raise UsageError("Task name must be unique!")

@rearq.task(queue="add")
async def add(a, b):
    return a + b


@app.get("/test/")
async def test_task():
    job = await add.delay(args=(1, 2))
    result = await job.result(timeout=5)  # wait result for 5 seconds
    print(result.result)
    return result

Motivation & Comparisons?

Hi there.
I wonder if you could share a bit about the project's motivations, I'm curious on why this was not a PR to Arq, what are the main pain points that made you write this instead of using Celery/RQ/Dramatiq/Arq.
Thanks!

rearq docs

hello 你好呀 !! 可能你正在编写该项目的使用文档

How to add corn tasks dynamically?

Is it currently only possible to write timed tasks to death in the program?

@rearq.task(cron="0 * * * *")
async def timer_add():
    return "timer"

aioredis.errors.ReplyError: ERR unknown command 'XGROUP'

aioredis.errors.ReplyError: ERR unknown command 'XGROUP'
Task was destroyed but it is pending!

由于程序异步操作,应在 redis5.x 以上,否则报以上错误。作者可以添加一个依赖 txt 声明依赖项版本。

如何用tortoise-orm连多个库?

本项目中,数据库用的是db_url形式传进去的,但是我的项目中连了俩数据库,我尝试在start_up阶段用tortoise.init把我数据库的db_config读进来,但是配置多个数据库后,每个model需要指定app,项目中内置的model没有配置app,就会导致报错。
这里我尝试自己改,但是实在没有思路...

job.result() return ValidationError

Hello. Thanks for rearq

When I try to get the result of the task I get the error:

...
File "/mnt/c/fastatz/venv/lib/python3.8/site-packages/rearq/job.py", line 103, in result_info
    return JobResult.parse_obj(v)
  File "pydantic/main.py", line 454, in pydantic.main.BaseModel.parse_obj
pydantic.error_wrappers.ValidationError: 1 validation error for JobResult
__root__
  JobResult expected dict not bytes (type=type_error)

Redis returns a byte object and pydantic cannot read it.

I replaced from return JobResult.parse_obj(v) to return JobResult.parse_raw(v).
Another option is to use json.loads()
Please fix that :)

Cannot connect to remote redis

Traceback (most recent call last):
File "/home/wajahath/Desktop/projects/fastApiProject/venv/bin/rearq", line 8, in
sys.exit(main())
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/cli.py", line 100, in main
cli()
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/core.py", line 1128, in call
return self.main(*args, **kwargs)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/core.py", line 1053, in main
rv = self.invoke(ctx)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/core.py", line 1659, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/core.py", line 1395, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/core.py", line 754, in invoke
return __callback(*args, **kwargs)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/cli.py", line 20, in wrapper
return loop.run_until_complete(f(*args, **kwargs))
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/cli.py", line 63, in worker
await w.run()
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/worker.py", line 265, in run
await self._pre_run()
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/worker.py", line 249, in _pre_run
raise e
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/rearq/worker.py", line 244, in _pre_run
await self._redis.xgroup_create(self.queue, self.group_name, mkstream=True)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/aioredis/client.py", line 1085, in execute_command
return await self.parse_response(conn, command_name, **options)
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/aioredis/client.py", line 1101, in parse_response
response = await connection.read_response()
File "/home/wajahath/Desktop/projects/fastApiProject/venv/lib/python3.9/site-packages/aioredis/connection.py", line 919, in read_response
raise response from None
aioredis.exceptions.ResponseError: unknown command 'XGROUP'

Process finished with exit code 1

I'm trying to connect to a remote redis, (which is accessible through out the redis cli, but not with the rearq, )
But i'm able to connect to local redis of same version

job_retry

你好作者,我发现任务的 job_retry 参数依然存在,但是并没有看到重试的方法。

它进入了 redis 的延迟队列中,但好像得不到调度。

aioredis.errors.AuthError: NOAUTH Authentication required.

main.py

from rearq import ReArq

rearq = ReArq(db_url='******',
              redis_host="192.168.11.58",
              redis_password="******",
              redis_port=6382,
              redis_db=11)


@rearq.on_shutdown
async def on_shutdown():
    # you can do some clean work here like close db and so on...
    print("shutdown")


@rearq.on_startup
async def on_startup():
    # you should do some initialization work here, such tortoise-orm init and so on...
    print("startup")


@rearq.task(queue="myqueue")
async def add(self, a, b):
    return a + b


@rearq.task(cron="*/5 * * * * * *")  # run task per 5 seconds
async def timer(self):
    return "timer"

启动命令:rearq main:rearq worker -q myqueue
init.py

        self._pool = await pool_factory(
            addr, db=self.redis_db, password=self.redis_password, encoding="utf8"
        )
        self._redis = Redis(self._pool)

后面添加调试代码await self._redis.get("key"),正常返回111,我提前往redis的对应库里写了这么一条数据。证明redis连接正常
但是启动报错aioredis.errors.AuthError: NOAUTH Authentication required.
以下是全部报错信息
111
/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aiomysql/cursors.py:158: Warning: Table 'jobresult' already exists
while (await self.nextset()):
Traceback (most recent call last):
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 195, in set_lock
with await self.connect() as redis:
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 166, in connect
self._pool = await self._create_redis_pool(address, **redis_kwargs)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 114, in _create_redis_pool
return await aioredis.create_redis_pool(*args, **kwargs)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/commands/init.py", line 188, in create_redis_pool
pool = await create_pool(address, db=db,
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/pool.py", line 58, in create_pool
await pool._fill_free(override_min=False)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/pool.py", line 386, in _fill_free
await conn.execute('ping')
aioredis.errors.AuthError: NOAUTH Authentication required.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/daji/SynologyDrive/code/project/make_table/venv/bin/rearq", line 8, in
sys.exit(main())
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/rearq/cli.py", line 96, in main
cli()
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/core.py", line 1137, in call
return self.main(*args, **kwargs)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/core.py", line 1062, in main
rv = self.invoke(ctx)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/core.py", line 1668, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/core.py", line 763, in invoke
return __callback(*args, **kwargs)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/rearq/cli.py", line 19, in wrapper
return loop.run_until_complete(f(*args, **kwargs))
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/rearq/cli.py", line 60, in worker
await w.run()
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/rearq/worker.py", line 249, in run
await self._pre_run()
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/rearq/worker.py", line 236, in _pre_run
async with await self._lock_manager.lock(constants.WORKER_KEY_LOCK):
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/algorithm.py", line 142, in lock
await self._set_lock(resource, lock_identifier, lease_time)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/algorithm.py", line 92, in _set_lock
raise error
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/algorithm.py", line 77, in _set_lock
elapsed_time = await self.redis.set_lock(resource, lock_identifier, lease_time)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 355, in set_lock
raise_error(successes, 'Can not acquire the lock "%s"' % resource)
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 26, in raise_error
raise [e for e in errors if type(e) is LockAcquiringError][0]
File "/Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredlock/redis.py", line 208, in set_lock
raise LockAcquiringError('Can not set lock') from exc
aioredlock.errors.LockAcquiringError: Can not set lock
Task was destroyed but it is pending!
task: <Task pending name='Task-10' coro=<RedisConnection._read_data() done, defined at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd0788133a0>()]> cb=[RedisConnection.init..() at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:168]>
Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<RedisConnection._read_data() running at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd0787eaac0>()]> cb=[RedisConnection.init..() at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:168]>
Task was destroyed but it is pending!
task: <Task pending name='Task-13' coro=<RedisConnection._read_data() running at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd078813520>()]> cb=[RedisConnection.init..() at /Users/daji/SynologyDrive/code/project/make_table/venv/lib/python3.8/site-packages/aioredis/connection.py:168]>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.