Giter Site home page Giter Site logo

rabbitmq-helper's Introduction

RabbitMqHelper

RabbitMQ 消息队列实现重试、异常处理。

流程图:

安装

 composer require huangbin2018/rabbitmq-helper

依赖

PHP >= 7.0

php-amqplib/php-amqplib >= 2.9

php_amqp.dll / php_amqp.so

使用

参考代码 demo/demo.php

// RabbitMq 连接配置
$rabbitMqConfig = [
    'host' => '10.10.30.211',
    'port' => 5672,
    'user' => 'wms_server',
    'password' => 'jf67DfRghk32f8o0',
    'vhost' => 'wms',
];
$mqObj = RabbitMqInstance::getInstance($rabbitMqConfig);
$isDelayed = false; // 是否为延时队列
// 读消息
$ret = $mqObj->getQueueMessage('demo.user.del', 'demo.user.delete', 'demoExChange', 10, $isDelayed, '');
var_dump($ret);

//生产者 发送消息
$publisher = $mqObj->getPublisherByExChange('demoExChange', $isDelayed);
for($i = 0; $i <= 100; $i++) {
    $data = [
        'user_name' => 'demo_' . $i,
        'user_id' => $i,
        'user_password' => md5(rand(0, 1000) . time()),
        'add_date' => date('Y-m-d H:i:s'),
    ];
    $source = 'mq_source'; // 消息来源
    $message = new \RabbitMqHelper\RabbitMQRetry\Message($data, $source);
    try {
        $publisher->publish($message, 'demo.user.add', true, 30); // 延时 30 秒
        $publisher->publish($message, 'demo.user.delete', true, 10);
        $publisher->publish($message, 'demo.user.edit', true);
    } catch (\Exception $e) {
        // 发生异常....
    }
}

// 启动消息消费者监听(阻塞模式)
// routingKey 路由匹配模式 * 匹配一个单词,# 匹配0个或者多个单词
// 如果 “*” and “#” 没有被使用,那么topic exchange就变成了direct exchange
$subConfig = [
    'queueName' => 'demo.user.add',
    'routingKey' => 'demo.user.add',
    'exchangeName' => 'demoExChange'
];
$mqCls->execSubscriber($subConfig, function (\RabbitMqHelper\RabbitMQRetry\SubMessage $message) {
    // TODO 业务逻辑实现
    echo date('Y-m-d H:i:s') . PHP_EOL;
    echo sprintf(
        "subscriber Message:\n RoutingKey <%s>\n MessageBody: %s\n",
        $message->getRoutingKey(),
        $message->getMessage()->serialize()
    );
    echo "================================================\n";

    // 业务处理逻辑


    // ask 处理成功返回 success (返回 success 后将会对消息进行处理确认),失败返回 failure (消息会重试处理)
    $returnSuccess = ['ask' => 'Success', 'message' => 'isSuccess'];
    $returnFailure = ['ask' => 'Failure', 'message' => 'Failure Message'];

    return $returnSuccess;
}, false, $isDelayed);

rabbitmq-helper's People

Contributors

huangbin2018 avatar

Stargazers

 avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.