kcloze / swoole-jobs Goto Github PK
View Code? Open in Web Editor NEW🚀Dynamic multi process worker queue base on swoole, like gearman but high performance.
🚀Dynamic multi process worker queue base on swoole, like gearman but high performance.
When using "./swoole-jobs stop http" the main server http process is still running.
5 loops are run and the error swoole-jobs kill failed.
is displayed.
Does I need to setup something ? Any idea ?
在连接rabbitMQ的时候用户密码错误,但是异常抛出的是类不存在,这个要改下
版本4.0
Fatal error: Uncaught Error: Class 'Kcloze\Jobs\Console' not found in /Users/xiaodi/docker-swoole/workspace/thinkphp6/test.php:82
Stack trace:
注意看这里
安装时 composer提示
使用了 --ignore-platform-reqs
Skipped installation of bin swoole-jobs for package kcloze/swoole-jobs: file not found in package
define('APP_PATH', __DIR__ . '/app/');
define('RUNTIME_PATH', __DIR__ . '/runtime/');
define('SWOOLE_JOBS_ROOT_PATH', __DIR__);
date_default_timezone_set('Asia/Shanghai');
// ThinkPHP 引导文件
require SWOOLE_JOBS_ROOT_PATH . '/vendor/autoload.php';
require SWOOLE_JOBS_ROOT_PATH . '/vendor/topthink/framework/src/think/App.php';
$swoole = new \Kcloze\Jobs\Console($config);
$console->run();
查看了vendor文件夹 不见Console.php
我现在的场景是这样的,下单队列->更新订单->处理K线数据->撤单,总共4个队列,配置如下
topics' => [
// 下单
['name' => 'SpotMainQueue', 'workerMinNum' => 1, 'workerMaxNum' => 1, 'queueMaxNum'=> 50000],
// 更新订单数据到 Mysql
['name' => 'UpdateOrder', 'workerMinNum' => 1, 'workerMaxNum' => 1, 'queueMaxNum'=> 50000],
// 更新K线图数据
['name' => 'Update1MinKLine', 'workerMinNum' => 1, 'workerMaxNum' => 1, 'queueMaxNum'=> 50000],
// 撤单队列
['name' => 'CancelOrder', 'workerMinNum' => 1, 'workerMaxNum' => 1, 'queueMaxNum'=> 50000]
],
因为我们对顺序执行有要求,所以我代码的处理流程是下单完了之后就丢给更新订单,然后再是K线,最后如果没法成交的,就丢撤单队列。
那么我现在都是给1个worker去处理,这样会不会出现抢占处理的问题?
谢谢解答.
日志中很多的pop error data,这个是什么原因导致的
感觉应该用 rPop 代替 lPop。
以下代码出自 Kcloze\Jobs\Queue\RedisTopicQueue
public function push($topic, JobObject $job, $delayStrategy=1, $serializeFunc='php'): string
{
if (!$this->isConnected()) {
return '';
}
$this->queue->lPush($topic, Serialize::serialize($job, $serializeFunc));
return $job->uuid ?? '';
}
public function pop($topic, $unSerializeFunc='php')
{
if (!$this->isConnected()) {
return;
}
$result = $this->queue->lPop($topic);
//判断字符串是否是php序列化的字符串,目前只允许serialzie和json两种
$unSerializeFunc=Serialize::isSerial($result) ? 'php' : 'json';
return !empty($result) ? Serialize::unSerialize($result, $unSerializeFunc) : null;
}
操作系统 centos7.6
php 7.2.19
扩展是参照tp5的项目说明集成的
一直提示 lSize() 已弃用,以下是错误日志
2019/07/24 17:52:40 YCFLOG [swoole-jobs] [error] [application.log] [PID65853]
错误类型:think\exception\ErrorException
错误代码:0
错误信息:Function Redis::lSize() is deprecated
错误堆栈:#0 /home/wwwroot/phpstorm/villin-task/vendor/kcloze/swoole-jobs/src/Queue/RedisTopicQueue.php(94): think\Error::appError(8192, 'Function Redis:...', '/home/wwwroot/p...', 94, Array)
#1 /home/wwwroot/phpstorm/villin-task/vendor/kcloze/swoole-jobs/src/Jobs.php(48): Kcloze\Jobs\Queue\RedisTopicQueue->len('test')
#2 /home/wwwroot/phpstorm/villin-task/vendor/kcloze/swoole-jobs/src/Process.php(138): Kcloze\Jobs\Jobs->run('test')
#3 [internal function]: Kcloze\Jobs\Process->Kcloze\Jobs{closure}(Object(Swoole\Process))
#4 /home/wwwroot/phpstorm/villin-task/vendor/kcloze/swoole-jobs/src/Process.php(204): Swoole\Process->start()
#5 [internal function]: Kcloze\Jobs\Process->Kcloze\Jobs{closure}(17)
#6 [internal function]: swoole_event_wait()
#7 {main}
Warning: Cannot modify header information - headers already sent by (output started at /mnt/hgfs/htdocs/swoole-jobs/src/Console.php:41) in /mnt/hgfs/htdocs/swoole-jobs/vendor/bramus/router/src/Bramus/Router/Router.php on line 294
命令: composer require kcloze/swoole-jobs
拷贝vendor\kcloze\swoole-jobs\swoole-jobs
(或swoole-jobs.php)文件到项目根目录,并调整代码为:
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
// 定义应用缓存目录
define('RUNTIME_PATH', __DIR__ . '/runtime/');
//ini_set('default_socket_timeout', -1);
date_default_timezone_set('Asia/Shanghai');
// 加载ThinkPHP基础文件
require __DIR__ . '/thinkphp/base.php';
// 执行应用
\think\App::initCommon();
//项目配置
//$config = require_once APP_PATH . '/extra/swoole-jobs.php';
$config = config('swoole-jobs');
$console = new Kcloze\Jobs\Console($config);
$console->run();
拷贝vendor\kcloze\swoole-jobs\config.php
文件到application\extra\swoole-jobs.php
,并讲日志及swoole主进程文件目录改到runtime目录下:
调整如下:
[
'logPath' => RUNTIME_PATH . 'swoole-jobs' . DS . 'log',//swoole-jobs运行日志目录
........
'pidPath' => RUNTIME_PATH . 'swoole-jobs' . DS .'pid',//swoole主进程id记录文件
]
在application目录下新增jobs目录,命名空间为app\jobs
,作用同swoole-jobs\src\Jobs
目录,用来编写任务实现的具体代码。
在application目录下新增service目录,命名空间为app\service
,,用来保存业务逻辑代码,此处新增SwooleJob.php业务处理类用来简化swoole-jobs的任务入队操作。
<?php
namespace app\service;
use Kcloze\Jobs\JobObject;
use Kcloze\Jobs\Logs;
use Kcloze\Jobs\Queue\BaseTopicQueue;
use Kcloze\Jobs\Queue\Queue;
/**
* 异步任务操作类
* 开发步骤:
* 1、在application\jobs目录中编写任务处理类,可以一个类中编写多个处理方法,每个方法设置不同参数
* 2、在application\extra\swoole-jobs.php文件中编辑job->topics处理进程任务数
* 3、在其他地方调用\app\service\SwooleJob::push()方法,并设置对应参数即可
*
*
* @package app\service
*/
class SwooleJob
{
/**
* 私有构造函数,防止外界实例化对象
*/
private function __construct()
{
}
/**
* 私有克隆函数,防止外办克隆对象
*/
private function __clone()
{
}
/**
* 推送swoole异步任务
* @param string $jobName 任务名称,对应application/jobs/目录中的类名
* @param string $method 类方法名
* @param array $params 方法参数,类型为数组,数组值顺序对应方法参数顺序
* @param array $jobExt 任务附加参数['delay'=>'延迟毫秒数','priority'=>'任务权重,数字类型,范围:1-5']
* @throws \Exception
*/
public static function push($jobName, $method = '', $params = [], $jobExt = [])
{
if (empty($jobName)){
throw new \Exception('异步任务名不能为空');
}
$config = config('swoole-jobs');
$logger = Logs::getLogger($config['logPath'] ?? '', $config['logSaveFileApp'] ?? '');
$queue = Queue::getQueue($config['job']['queue'], $logger);
//设置工作进程参数
$queue->setTopics($config['job']['topics']);
$jobExtras['delay'] = isset($jobExt['delay']) ? $jobExt['delay'] : 0;
$jobExtras['priority'] = isset($jobExt['priority']) ? $jobExt['priority'] : BaseTopicQueue::HIGH_LEVEL_1;
//任务类名称
$jobClass = '\app\jobs\\' . $jobName;
$job = new JobObject($jobName, $jobClass, $method, $params, $jobExtras);
$result = $queue->push($jobName, $job);
return $result;
}
}
然后可以在框架任意地方使用下列代码新增异步任务
$name = \app\service\SwooleJob::push('MyJob','MyMethod', ['方法参数1值','方法参数2值','...'], ['任务其他参数...']);
队列消费服务操作与swoole-jobs一致。
最后感谢作者分享这么好的异步任务项目。
1、启动方式,建议可以参考下 laravels、swoolefy
2、整体项目结构有点乱, 包命名规则和继承都没写上,等于摆设。(src/*,建议用app作为顶级命名空间)
3、redis加上一个选择库吧。
4、队列这块建议实现一个适配器模式,后面用mysql、redis、其它MQ都可以带入.
如题
希望能使用如下的方式的push一个job:
$queue->push(new \Kcloze\Jobs\Jobs\MyJob(['method' => 'test1', 'params' => ['kcloze', time()]]));
这样的话,JobObject类就需要进行调整,具体业务的job类比如MyJob类则需要继承JobObject。
可以将action的具体类也放在配置文件中,扩展中设置action的interface或者baseClass就够了吧,由使用者在项目中自行实现start方法?
很好的几点:
1、很好的队列、消费结构,易于扩展。
2、让我学会了swoole process
3、通过信号机制,master进程对子进程进行监控。有了具体的实例,也算是加深了我对信号的理解.
但是我有一个疑问啊:
怎么调试子进程的代码?比如子进程中调用了一个undefined的函数,自然会exit()掉,会给父进程发送一个SIGCHLD信号。但是,好像没办法直接看到"function xxx undefined"这个fatal error错误。请问有什么办法?
//应该私有化属性
private static $instance;
//应该私有化克隆方法
private function __clone()
{
}
//应该私有化构造方法
private function __construct($config=[])
{
}
//没有private 也没有私有化克隆方法 好像单例模式失效了
public function __construct($logPath, $logSaveFileApp='', $logSystem = '')
{
}
/**
* 获取日志实例.
*
* @$logPath
*
* @param mixed $logPath
* @param mixed $logSaveFileApp
* @param mixed $logSystem
*/
public static function getLogger($logPath='', $logSaveFileApp='', $logSystem = '')
{
if (isset(self::$instance) && null !== self::$instance) {
return self::$instance;
}
self::$instance=new self($logPath, $logSaveFileApp, $logSystem);
return self::$instance;
}
//swoole-jobs/src/Console.php 使用中Logs 直接用new Logs方式
public function __construct($config)
{
Config::setConfig($config);
$this->config = Config::getConfig();
//初始化日志记录的路径,保存的文件名,系统名
$this->logger = new Logs($this->config['logPath'] ?? '', $this->config['logSaveFileApp'] ?? '', $this->config['system'] ?? '');
}
2021/05/27 11:37:26 YCFLOG [swoole-jobs] [error] [error] [PID73404]
job execute too fast, uuid: MyJob60af13eae56af.1622086634.9397, execTime:4.6968460083008E-5
平时还蛮多人用我这个小工具,可能会有志同道合的朋友看到
拉钩职位描述:PHP架构师
职位描述:
工作职责:
1.负责产品的设计研发工作;
2.参与整体系统架构设计,找出系统瓶颈,改进系统算法,提高系统性能;
3.参与重大项目的技术方案设计及技术评审。
任职资格:
1、有Web系统设计开发经验,熟悉常用设计模式;
2、熟悉PHP,熟悉PHP常用框架;
3、优秀的数据库设计优化能力,至少熟悉一种数据库应用,熟悉mysql者优先;
4、熟悉Linux,熟悉一门或几门以下语言;C/C++/go等;
5、扎实的计算机基础,熟悉数据结构,有较强的算法设计能力;
6、学习能力强,有较好的沟通能力,能迅速融入团队。
以下经验将有加分:
1、熟悉swoole,深入其原理,有swoole实际项目研发经验;
2、具有大型,高并发Web经验,有C语言和PHP扩展开发经验;
默认可以不需要rabbitmq包
[2018-01-03 13:28:45 @16252.0] WARNING swSignalfd_setup: signalfd() failed. Error: Function not implemented[38]
有可能影响子进程信号注册
使用配置如下
return $config = [
//log目录
'logPath' => __DIR__ . '/../log',
'logSaveFileApp' => 'application.log', //默认log存储名字
'logSaveFileWorker' => 'crontab.log', // 进程启动相关log存储名字
'pidPath' => __DIR__ . '/../log',
'sleep' => 2, // 队列没消息时,暂停秒数
'processName' => ':swooleTopicQueue', // 设置进程名, 方便管理, 默认值 swooleTopicQueue
//job任务相关
'job' => [
'topics' => [
// ['name'=>'MyJob', 'workerMinNum'=>1, 'workerMaxNum'=>20],
// ['name'=> 'MyJob2', 'workerMinNum'=>3, 'workerMaxNum'=>10],
// ['name'=> 'MyJob3', 'workerMinNum'=>1, 'workerMaxNum'=>10],
['name'=> 'football_match_list', 'workerMinNum'=>1, 'workerMaxNum'=>10],
],
// redis
'queue' => [
'class' => '\Kcloze\Jobs\Queue\RedisTopicQueue',
'host' => '127.0.0.1',
'port' => 6379,`
表现出的问题是.已经挤压了很多消息,但是主进程也已经检测到了,但是一直不处理.
记录 crontab日志如下
2018/01/03 11:22:28 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:30 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:32 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:34 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:36 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:38 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:40 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:42 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:44 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:46 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:22:48 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:23:50 [info] [crontab.log] topic: football_match_list runnning len: 0
2018/01/03 11:23:52 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:23:54 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:23:56 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:23:58 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:00 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:02 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:04 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:06 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:08 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:10 [info] [crontab.log] topic: football_match_list runnning len: 24
2018/01/03 11:24:12 [info] [crontab.log] topic: football_match_list runnning len: 24
`
我执行了composer require kcloze/swoole-jobs:
但是提示以下的错误:
Using version ^4.2 for kcloze/swoole-jobs
./composer.json has been updated
Running composer update kcloze/swoole-jobs
Loading composer repositories with package information
Updating dependencies
Your requirements could not be resolved to an installable set of packages.
Problem 1
- Root composer.json requires kcloze/swoole-jobs ^4.2 -> satisfiable by kcloze/swoole-jobs[v4.2.0].
- kcloze/swoole-jobs v4.2.0 requires symfony/console ^3.4 -> found symfony/console[v3.4.0-BETA1, ..., 3.4.x-dev] but the package is fixed to v5.4.10 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
Use the option --with-all-dependencies (-W) to allow upgrades, downgrades and removals for packages currently locked to specific versions.
You can also try re-running composer require with an explicit version constraint, e.g. "composer require kcloze/swoole-jobs:*" to figure out if any version is installable, or "composer require kcloze/swoole-jobs:^2.1" if you know which you need.
Installation failed, reverting ./composer.json and ./composer.lock to their original content.
请问如何解决?
docs 裡面的 安裝 / 入門 / 開發 文件是空的
执行docker build -t swoole-jobs .之后
`Get:5 http://mirrors.aliyun.com jessie/updates/main amd64 Packages [825 kB]
Get:6 http://mirrors.aliyun.com jessie/main amd64 Packages [9098 kB]
Fetched 10.1 MB in 7s (1282 kB/s)
W: Failed to fetch http://mirrors.aliyun.com/debian/dists/jessie-updates/InRelease Unable to find expected entry 'main/binary-amd64/Packages' in Release file (Wrong sources.list entry or malformed file)
E: Some index files failed to download. They have been ignored, or old ones used instead.
The command '/bin/sh -c apt-get update && apt-get install -y libpng-dev libpq-dev g++ libicu-dev libxml2-dev htop git vim libfreetype6-dev libjpeg62-turbo-dev libmcrypt-dev zlib1g-dev librabbitmq-dev' returned a non-zero code: 100
`
在使用RabbitMQ作为消息队列的时候,队列中有大量的消息。相关进程在消费队列中的消息一段时间后,staticWorker与dynamicWorker进程都一直存在,但却不在继续消费消息,导致队列中的任务大量的积攒。请问这个是什么原因呢?
php8.0.2 + swoole4.6.2
Deprecated: Method Swoole\Timer::set() is deprecated
Warning: Undefined array key 2790123 in ****Kcloze/Jobs/Process.php on line 200
Warning: file_get_contents(***master.info): Failed to open stream: No such file or directory in ***Kcloze/Jobs/Process.php on line 417
客户端请求量上来后,就会一直刷这个信息......
ERROR swFactoryProcess_finish (ERROR 1004): send 159 byte failed, because connection[fd=3689] is closed.
https://github.com/kcloze/swoole-jobs/blob/master/src/Process.php#L440
是为了控制主进程的调度不被影响还是协程不适合消费队列?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.