Giter Site home page Giter Site logo

xingwenge / canal-php Goto Github PK

View Code? Open in Web Editor NEW
383.0 10.0 67.0 392 KB

Alibaba mysql database binlog incremental subscription & consumer components Canal's php client[阿里巴巴mysql数据库binlog的增量订阅&消费组件 Canal 的 php 客户端 ] https://github.com/alibaba/canal

PHP 100.00%

canal-php's Introduction

canal-php

一.canal-php 简介

canal-php 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 php 客户端。为 php 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

二.应用场景

canal-php 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

三.工作原理

canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

四.工作流程

1.Canal连接到mysql数据库,模拟slave

2.canal-php 与 Canal 建立连接

3.数据库发生变更写入到binlog

4.Canal向数据库发送dump请求,获取binlog并解析

5.canal-php 向 Canal 请求数据库变更

6.Canal 发送解析后的数据给canal-php

7.canal-php收到数据,消费成功,发送回执。(可选)

8.Canal记录消费位置。

架构图

五.快速启动

安装Canal

Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart

构建canal php客户端

$ composer require xingwenge/canal_php

or

$ git clone https://github.com/xingwenge/canal-php.git
$ cd canal-php
$ composer update

建立与Canal的连接

try {
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

    $client->connect("127.0.0.1", 11111);
    $client->checkValid();
    $client->subscribe("1001", "example", ".*\\..*");
    # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行效果图

更多详情请查看 Sample

性能

本地开发环境

消费速度:sql insert 10000 事件,32秒消耗完成。消费速度 312.5 条/s。

内存使用:4 M。

canal-php's People

Contributors

kzkzzzz avatar xingwenge avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

canal-php's Issues

PHP Warning: unpack(): Type N: not enough input, need 4, have 0 in

PHP Warning: unpack(): Type N: not enough input, need 4, have 0 in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34

Warning: unpack(): Type N: not enough input, need 4, have 0 in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34
PHP Notice: Trying to access array offset on value of type bool in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34

Notice: Trying to access array offset on value of type bool in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34
unexpected packet type:0
PHP Warning: unpack(): Type N: not enough input, need 4, have 0 in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34

Warning: unpack(): Type N: not enough input, need 4, have 0 in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34
PHP Notice: Trying to access array offset on value of type bool in /opt/www/vendor/xingwenge/canal_php/src/adapter/swoole/CanalConnector.php on line 34

执行一段时间 一直报 batchId:1007 is not the firstly:854

get data error. error code:400, error message:something goes wrong with channel:[id: 0x0f268ec7, /127.0.0.1:58478 => /127.0.0.1:11111], exception=com.alibaba.otter.canal.meta.exception.CanalMetaManagerException: batchId:1007 is not the firstly:854

这个怎么搞

unpack(): Type N: not enough input, need 4, have 0

server端增加了

canal instance user/passwd

canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
后php代码
$client = CanalConnectorFactory::createClient(CanalConnectorFactory::CLIENT_SOCKET);
$client->connect("127.0.0.1", 11111);
$client->checkValid('canal','canal');
$client->subscribe("1001", "test", "trait\..*");

传入用户名密码后,报错unpack(): Type N: not enough input, need 4, have 0
进入文件xingwenge/canal_php/src/adapter/clue/CanalConnector.php
打印$data为空

拉取消息提示packet type=CLIENTAUTHENTICATION is NOT supported!

Exception 'Exception' with message 'get data error. error code:400, error message:packet type=CLIENTAUTHENTICATION is NOT supported!'

in /mnt/c/futu/ns_oss/vendor/xingwenge/canal_php/src/adapter/CanalConnectorBase.php:205

Stack trace:
#0 /mnt/c/futu/ns_oss/vendor/xingwenge/canal_php/src/adapter/CanalConnectorBase.php(151): xingwenge\canal_php\adapter\CanalConnectorBase->getWithoutAck()
#1 /mnt/c/futu/ns_oss/console/controllers/CanalController.php(107): xingwenge\canal_php\adapter\CanalConnectorBase->get()

不能同步mysql blob类型的字符串数据

客户端中,不能同步处理好mysql源表的blog类型的字符串,会有乱码的发生。乱码只产生于 中文字符,英文字符没有问题。
canal版本为1.1.14
canal-python pip客户端为0.4版本

unpack(): Type N: not enough input, need 4, have 0

报错信息在 canal-php/src/adapter/clue/CanalConnector.php on line 29
我尝试打印 $data 变量,值为 ""。

当配置 canal 账户密码时有此错误提示。
$client->checkValid('canal', 'canal');

canal.properties 配置文件:

canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

感谢答复~

php客户端消费速率问题

我搭建的单机canal和单个php客户端来进行消费,中间涉及重组数据再输出到es,所以消费效率不高,35条/s,消费方式是tcp,同步ack。请问在tcp模式下有什么好的办法提高消费效率吗?php客户端如何设置异步ack呢,有例子吗?

请问支持 HA 吗?

docker 里面运行:

    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
    $client->connect("zookeeper", 2181);
php1_1          | Warning: unpack(): Type N: not enough input, need 4, have 0 in /canal/src/adapter/clue/CanalConnector.php on line 29
php1_1          |
php1_1          | Notice: Trying to access array offset on value of type bool in /canal/src/adapter/clue/CanalConnector.php on line 29
php1_1          | Socket operation failed: Success
php1_1          | Socket operation failed: Broken pipe (SOCKET_EPIPE)

有以上报错,请问是不支持 HA 吗,还是我写法错了呢?
感谢!

Connection reset by peer

canal启动后,canal.log会一直报“something goes wrong with channel:[id: 0x7cb4dc0d, /xx.xx.x.xxx:41776 => /xx.xx.x.xxx:11111], exception=java.io.IOException: Connection reset by peer”,出现频率每分钟基本都有,但是canal可以正常监听到binlog数据。

搭建方式:单例Simple(docker容器)
客户端:php客户端(官方demo)

测试成功了,但是没有答应出SQL 语句

-------> before
id : 29 update= false
catid : 6 update= false
modelid : 2 update= false
title : 案例展示 update= false
thumb : https://img.staticdj.com/5db9190f2c7cfa1b8dcd1572b1862965_1080x.jpg update= false
keywords : update= false
description : update= false
listorder : 0 update= false
status : 1 update= false
hits : 5 update= false
username : admin update= false
time : 1392974386 update= false
-------> after

这个没打印出 SQL语句,岂不是要根据
foreach ($rowChange->getRowDatas() as $rowData) {
switch ($evenType) {
case EventType::DELETE:
self::ptColumn($rowData->getBeforeColumns());
break;
case EventType::INSERT:
self::ptColumn($rowData->getAfterColumns());
break;
default:
echo '-------> before', PHP_EOL;
self::ptColumn($rowData->getBeforeColumns());
echo '-------> after', PHP_EOL;
self::ptColumn($rowData->getAfterColumns());
break;
}
}
里面的 增删改查 把每种情况的字段都取出来,然后根据条件 插入到ES ?

能够拿到sql,但是RowData为空

canal-server:1.1.4

按照 sample 样例进行测试,能够正常输出sql

但是打印的 RowData 为空,类型为 \Google\Protobuf\Internal\RepeatedField 并不是期望的 \Com\Alibaba\Otter\Canal\Protocol\RowData[]

查看了 example 和 canal 日志,并没有报错

不知道是什么原因

请问我可以查询到队列里的条数吗?

请问如果我可以查询到队列里的条数吗?如果生产的数据非常多,消费不及时的话。如果感知不到未消费的条数,就无法根据情况去判断要增加的消费进程数量

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.