Giter Site home page Giter Site logo

think-queue's Introduction

think-queue for ThinkPHP6

安装

composer require topthink/think-queue

配置

配置文件位于 config/queue.php

公共配置

[
    'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
]

创建任务类

推荐使用 app\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)

还有个可选的任务失败执行的方法 failed 传入的参数为$data(发布任务时自定义的数据)

下面写两个例子

namespace app\job;

use think\queue\Job;

class Job1{
    
    public function fire(Job $job, $data){
    
            //....这里执行具体的任务 
            
             if ($job->attempts() > 3) {
                  //通过这个方法可以检查这个任务已经重试了几次了
             }
            
            
            //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
            $job->delete();
            
            // 也可以重新发布这个任务
            $job->release($delay); //$delay为延迟时间
          
    }
    
    public function failed($data){
    
        // ...任务达到最大重试次数后,失败了
    }

}
namespace app\lib\job;

use think\queue\Job;

class Job2{
    
    public function task1(Job $job, $data){
    
          
    }
    
    public function task2(Job $job, $data){
    
          
    }
    
    public function failed($data){
    
          
    }

}

发布任务

think\facade\Queue::push($job, $data = '', $queue = null)think\facade\Queue::later($delay, $job, $data = '', $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行

$job 是任务名
命名空间是app\job的,比如上面的例子一,写Job1类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

$data 是你要传到任务里的参数

$queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填

监听任务并执行

&> php think queue:listen

&> php think queue:work

两种,具体的可选参数可以输入命令加 --help 查看

可配合supervisor使用,保证进程常驻

think-queue's People

Contributors

baiy avatar cexll avatar cjango avatar coolseven avatar lilwil avatar liu21st avatar yangweijie avatar yunwuxin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

think-queue's Issues

入口文件index.php定义config目录执行队列失效

入口文件index.php
define('CONF_PATH', __DIR__ . '/../config/');
后,在application/config 目录定义
E:\WEBROOT\TP5-BASE\CONFIG
│ config.php
│ database.php
│ route.php
│ tags.php

└─extra
queue.php
这时执行队列 php think queue:listen 就失败

不在index.php文件定义 config目录,移动相关目录到application就可以

不知道哪里出的问题

现在运行后,会在runtime/log下记录很多日志

php think queue:work --daemon
会在runtime/log下记录很多日志,之前在调试模式下也没有的,是哪里的问题呢,不想要这么多。日志里全是这个,其它日志都不好找

[ 2016-12-09T11:37:45+08:00 ] 0.0.0.0 0.0.0.0 CLI
[ log ] cmd:think queue:work --queue=default --delay=0 --memory=128 --sleep=3 --tries=0 [运行时间:3.074919s][吞吐率:0.33req/s] [内存消耗:1,807.00kb] [文件加载:72]
[ info ] [ DB ] INIT mysql
[ info ] [ LOG ] INIT file
[ sql ] [ SQL ] SHOW COLUMNS FROM jobs_queue [ RunTime:0.001548s ]
[ sql ] [ SQL ] UPDATE jobs_queue SET reserved=0,reserved_at=NULL,attempts=attempts + 1 WHERE queue = 'default' AND reserved = 1 AND reserved_at <= 1481254602 [ RunTime:0.000654s ]
[ sql ] [ SQL ] SELECT * FROM jobs_queue WHERE queue = 'default' AND reserved = 0 AND available_at <= 1481254662 ORDER BY id asc LIMIT 1 FOR UPDATE [ RunTime:0.000779s ]


redis

WRONGTYPE Operation against a key holding the wrong kind of value

composer 安装不了?

[InvalidArgumentException]
Could not find package topthink/think-queue at any version for your minimum
-stability (stable). Check the package spelling or your minimum-stability

Thinkphp5.1.12 无法执行队列

驱动:database

检查错误时,先清理了全部缓存!

一、无法获取下一个有效任务,打印SQL出来:

SELECT * FROM ft_jobs WHERE queue = 'test' AND reserved = 1 AND reserved_at <= 1525945591 AND queue = 'test' AND reserved = 0 AND available_at <= 1525945651 ORDER BY id ASC LIMIT 1 FOR UPDATE

其中 reserved 条件有多个,并且冲突,如果 Database.php 中第 111 行 $this->db->name 修改成 Db::name 则任务能正常处理

二、任务也不能正常删除,同上问题,164行

三、如果数据库中存在任务,要PUSH新任务时会出错:

SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '199' for key 'PRIMARY'

相关SQL为:

INSERT INTO ft_jobs (queue , payload , attempts , reserved , reserved_at , available_at , created_at , id) VALUES (:data__queue , :data__payload , :data__attempts , :data__reserved , NULL , :data__available_at , :data__created_at , :data__id)

其中对比源代码,多了一个 id ,并且这个 id 还被赋值导致主键重复

希望能增加调用时切换driver 的功能

一个项目里不见得队列都采取一种方案,比如有一部分是数据库做的队列,有的是 redis 的,或又有的是需要放 redis 里0号数据里,有的放1号里。

类似 laravel 里可以在调用的时候指定 queue 和 contection。

listen模式下CPU资源占用过高的问题

在listen模式下, 什么都没开始执行, 用于listen的php-cli会稳定吃掉一个核心的资源
image
队列的代码见 coolseven的教程, 用了一个简单的HelloWorld例子进行测试, 因为还并没有开始执行任务, 应该是listen的循环部分有问题
think-queue版本为1.1.4, 没有修改过框架
image
系统环境:
Windows 10 x64 1511
php 7.0.1 nts

Queue类找不到

把Queue类移到外边找不到了。。。怎么还发布了个稳定版

PHP Fatal error:  Class 'think\queue\Queue' not found in E:\test\tp5\vendor\topt
hink\think-queue\src\queue\Worker.php on line 54

项目根目录外执行队列报错:Could not open input file: think

使用时候发现,如果不在项目根目录下运行会出现报错:Could not open input file: think,经排查发现,think-queue/src/queue/Listener.php Line 52 直接使用了 think作为命令名,导致think文件定位失败。

原代码:

        $this->workerCommand =
            '"' . PHP_BINARY . '" think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s';

建议修改为:

$this->workerCommand =
            '"' . PHP_BINARY . '" ' . ROOT_PATH . 'think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s';

redis驱动下,expire 设置为null 时,无法使用延迟执行功能

在 redis 的配置中有一个 expire 配置项,用于设置每个任务的过期时间。
\queue\connector\Redis.phppop 方法中会 检查是否设置了 expire

public function pop($queue = null)
    {
        $original = $queue ?: $this->options['default'];

        $queue = $this->getQueue($queue);

        if (!is_null($this->options['expire'])) {               
            $this->migrateAllExpiredJobs($queue);
        }

        $job = $this->redis->lPop($queue);

        if ($job !== false) {
            $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);

            return new RedisJob($this, $job, $original);
        }
    }

如果 expire 配置为 null , 将跳过过期任务的检查。
但上面的代码中的实际逻辑是同时跳过了延迟执行的任务和过期的任务。
这里应该修改为

public function pop($queue = null)
    {
        $original = $queue ?: $this->options['default'];

        $queue = $this->getQueue($queue);

         this->migrateExpiredJobs($queue . ':delayed', $queue, false);

         if (!is_null($this->options['expire'])) {               
             $this->migrateExpiredJobs($queue . ':reserved', $queue);
         }

        $job = $this->redis->lPop($queue);

        if ($job !== false) {
            $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);

            return new RedisJob($this, $job, $original);
        }
    }

才对吧?

Queue::later(30,' ........不延迟执行 而是立刻执行

Queue::later(30,' ........不延迟执行 而是立刻执行

配置文件为:
return [ 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 0, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, ];

verbose参数丢失

think\queue\ListenerworkerCommand没有带有verbose-vvv参数

    public function __construct($commandPath)
    {
        $this->commandPath   = $commandPath;
        $this->workerCommand =
            '"' . PHP_BINARY . '" think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s';
    }

即使运行命令时使用了verbose, �这个参数并不会起作用,如php think queue:listen -vvv

think-queue版本2.0.3 tp版本5.1报错

执行php think queue:listen

[think\exception\ErrorException]
Non-static method think\Config::get() should not be called statically
这个错误 tp5.0的时候不会报这个错误呀

win下,redis不能执行队列的问题

thinkphp5,window下,later模式,队列里添加一条可以执行,但是只要是添加一条以上,就不能执行。
我自己将connector下的Redis.php里的pushExpiredJobsOntoNewQueue修改如下:
protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true) { if ($attempt) { foreach ($jobs as &$job) { $attempts = json_decode($job, true)['attempts']; $job = $this->setMeta($job, 'attempts', $attempts + 1); } } if(isset($job)){ unset($job); } foreach ($jobs as $job){ call_user_func_array([$this->redis, 'rPush'], [$to, $job]); } }
这样就可以了,请问是什么原因

queue:work 命令下的失败事件

你好,在执行 queue:work 命令时,发现超过最大尝试次数的事件不可用。
我的命令行参数是

php think queue:work --queue helloQueue --tries 2

application\ tags.php 这个配置文件中,设置的任务失败的事件的配置是:

  //  配置任务失败事件处理 
    'queue.failed'=> [
        'application\\index\\behavior\\QueueFailedHandler',
    ],

看了下源代码,
think-queue\src\queue\Worker.php 类中,处理该事件的代码为:

    /**
     * Log a failed job into storage.
     * @param  \Think\Queue\Job $job
     * @return array
     */
    protected function logFailedJob(Job $job)
    {
        if (Hook::listen('queue.failed', $job, null, true)) {
            $job->delete();
            $job->failed();
        }

        return ['job' => $job, 'failed' => true];
    } 

根据TP官网中对行为的 文档
这里的'queue.failed'标签是不是需要改成 'queue_failed' ?

另外,think-queue 内部并没有内置这个‘queue.failed’事件,用户如果想要使用这个功能的话需要手动在 application\tags.php 这个配置文件中添加该事件标签,否则不会触发消费者的 failed() 方法。建议在readme 中补充一下。
谢谢。

异步执行无效

将connector设为sync时能正常执行,改为database或redis后,调用Queue::push能把数据插入到mysql或redis中,但是执行php think queue:listen或php think queue:work --daemon后台任务并不能被执行

Call to undefined method think\helper\Str::studly()

在vendor/topthink/think-queue/src/Queue.php第33行
$options = Config::get('queue');
$options为null,其实他应该获取的是application/extra/queue.php中的配置
在vendor/topthink/think-queue/src/Queue.php第38行
$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);
studly这个方法不存在
下面是我修改后的方法
在application/config.php添加
'queue' => [ 'type' => 'redis', 'expire' => 60, 'default' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false ],
$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);改为$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . ucfirst($type);
运行正常!

数据库报错

SQLSTATE[42000]: Syntax error or access violation: 1064 You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'FOR UPDATE' at line 1

没人维护了?

use think\facade\Config;
$options = Config::pull('queue');
这是个假的吧???

执行区别

php think queue:listen

php think queue:work --daemon(不加--daemon为执行单个任务)

两个是一起执行,还是分开的两种不同执行方式,有啥区别

为什么用listen 执行cpu占用会很高

能否开多个work ,...............里面的任务会不会重复执行

关于cpu利用率

image

cpu占用率太高的问题怎么解决啊,
我现在开启两个cli 来运行 think-queue 用的是listen 请问怎么设置能够保证cpu不会长期这样占用

ps 平时没有任务的时候cpu也是这个值的

请这一系列错误如何解决?

[ 2018-09-03T04:15:59+08:00 ][ error ] [8]PDOStatement::execute(): send of 290 bytes failed with errno=32 Broken pipe
[ 2018-09-03T04:15:59+08:00 ][ error ] [2]Error while sending STMT_EXECUTE packet. PID=13156
[ 2018-09-03T04:16:31+08:00 ][ error ] [10501]SQLSTATE[HY000]: General error: 2006 MySQL server has gone away
[ 2018-09-03T04:17:04+08:00 ][ error ] [10501]SQLSTATE[HY000]: General error: 2006 MySQL server has gone away
[ 2018-09-03T04:20:05+08:00 ][ error ] [10501]SQLSTATE[HY000]: General error: 2006 MySQL server has gone away

谢谢

expire 的配置疑惑

@yunwuxin 你好,在使用tp-queue 这个队列库时,对于 expire 项的配置,有一处比较疑惑的地方:
假设现在存在两个队列,分别为 QueueA , QueueB ,
预估 QueueA , QueueB 中的每个job的大约耗时分别为 40秒 , 400秒。
那我们的expire 应该配置为 60 还是 600 呢?
如果配置为 60 的话,那么如果使用多个消费进程去处理 QueueB , 那么 QueueB 中的任务是不是频繁地被误作超时处理?
如果配置为 600 的话,那 QueueA 中真正超时的任务是不是总是得不到及时的恢复?

我用的是 php think queue:work --daemon 模式。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.