Giter Site home page Giter Site logo

canal-python's Introduction

canal-python

一.canal-python 简介

canal-python 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

二.应用场景

canal-python 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

三.工作原理

canal-python 是 Canal 的 python 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

四.工作流程

1.Canal连接到mysql数据库,模拟slave

2.canal-python 与 Canal 建立连接

2.数据库发生变更写入到binlog

5.Canal向数据库发送dump请求,获取binlog并解析

4.canal-python 向 Canal 请求数据库变更

4.Canal 发送解析后的数据给canal-python

5.canal-python收到数据,消费成功,发送回执。(可选)

6.Canal记录消费位置。

五.快速启动

安装Canal

Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart

环境要求

python >= 3

构建canal python客户端

pip install canal-python

建立与Canal的连接

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict()
            if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict()
                for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            print(data)
    time.sleep(1)

client.disconnect()

更多详情请查看 Sample

canal-python's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

canal-python's Issues

ack with error protobuf object, (ACK 的包写错了对象)

源代码:https://github.com/haozi3156666/canal-python/blob/68ecec0795ef712c0c00d001fccb39215cfbc3dc/canal/client.py#L116

        elif packet.type == CanalProtocol_pb2.PacketType.ACK:
            ack = CanalProtocol_pb2.PacketType.Ack()
            ack.MergeFromString(packet.body)
            if ack.error_code > 0:
                raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))

CanalProtocol_pb2.PacketType.Ack()是一个枚举?即使是枚举也不能这么写吧。此处ACK回包,应该是返回 CanalProtocol_pb2.Ack()

        elif packet.type == CanalProtocol_pb2.PacketType.ACK:
            ack = CanalProtocol_pb2.Ack()
            ack.MergeFromString(packet.body)
            if ack.error_code > 0:
                raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))

否则,运行有可能报错,至于为何不是必现的错误,我也不解,不知是否是bytes流的问题。

demo 几个调整建议

1、 # 原代码连等式会造成before 和 after的值全部都是after的值,需分别创建字典。
format_data['before'] = dict()
format_data['after'] = dict()
'''
format_data['before'] = format_data['after'] = dict()
'''
2、 if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''

不能区分BEGIN ,COMMIT

In [48]: EntryProtocol_pb2.EventType.items()                                                                                                                                                                                                                    
Out[48]: 
[('EVENTTYPECOMPATIBLEPROTO2', 0),
 ('INSERT', 1),
 ('UPDATE', 2),
 ('DELETE', 3),
 ('CREATE', 4),
 ('ALTER', 5),
 ('ERASE', 6),
 ('QUERY', 7),
 ('TRUNCATE', 8),
 ('RENAME', 9),
 ('CINDEX', 10),
 ('DINDEX', 11),
 ('GTID', 12),
 ('XACOMMIT', 13),
 ('XAROLLBACK', 14),
 ('MHEARTBEAT', 15)]

BEGIN和COMMIT的header.eventType都是0, 也就是EntryProtocol_pb2.EventType.Name(0)='EVENTTYPECOMPATIBLEPROTO2'
无法区分begin和commit.

还有一个问题是发现经常获取不到COMMIT, 除非COMMIT比事务中最后一个语句晚6秒执行

连接失败

完整日志:

connected to 127.0.0.1:11111
Traceback (most recent call last):
  File "canal_client.py", line 10, in <module>
    client.check_valid(username=b'canal', password=b'canal')
  File "/usr/local/lib/python3.6/site-packages/canal/client.py", line 37, in check_valid
    data = self.connector.read_next_packet()
  File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 41, in read_next_packet
    data = self.read(self.packet_len)
  File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 29, in read
    raise Exception('TSocket: Could not read bytes from server')
Exception: TSocket: Could not read bytes from server

event_type为delete、insert时format_data不完全

第28行、第33行 row.beforeColumns、row.afterColumns中包含多个字段的修改
下面遍历语句中对format_data的操作是 format_data = {column.name: column.value} ,这样format_data仅仅会保留row.afterColumns中最后一个字段的信息
建议改成format_data.setdefault(column.name, column.value)

demo有个bug,

format_data['before']=format_data['after'] = dict() 改为
format_data['before'], format_data['after'] = dict(), dict()

我在查看修改数据库记录前后端的变化,发现修改前后字段的内容都一样,吓我一跳。呵呵。

client demo 错误

/canal/client.py
id(format_data['before']) == id(format_data['after']) is true

should be : format_data['before'],format_data['after'] = dict(), dict()

同步blob的字符串中文乱码

客户端中,不能同步处理好mysql源表的blog类型的字符串,会有乱码的发生。乱码只产生于 中文字符,英文字符没有问题。
canal版本为1.1.14
canal-python pip客户端为0.4版本

复原的步骤为:

create table test_blog ( id int not null primary key auto_increment, detail blob not null );

insert into test_blog (detail) values("测试");

此时同步这条数据就是乱码。

example.py中第41行存在BUG

format_data['before'] = format_data['after'] = dict()这句表示format_data['before']与format_data['after'] 指向的是同一个地址,修改format_data['before']时format_data['after'] 也会被修改,最终导致无法在mysql row模式下同时取到update前后的数据取值。
建议把第41行修改为:
format_data['before'] = dict()
format_data['after'] = dict()

client.get()报错

Traceback (most recent call last):
File "/Users/admin/Desktop/project/dataCollection/canal.py", line 13, in
message = client.get()
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 85, in get
message = self.get_without_ack(size)
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 119, in get_without_ack
ack = CanalProtocol_pb2.PacketType.Ack()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/google/protobuf/internal/enum_type_wrapper.py", line 95, in getattr
raise AttributeError
AttributeError

建立与Canal的连接代码bug

if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}

elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}

format_data = {column.name: column.value} 只会保留最后一个column的数据,
应该是format_data[column.name] = column.value

canal python client HA

canal的standby节点启动canal python client时报错,主节点可以正常启动并消费。
Exception: Failed to subscribe. error code:400, error message:something goes wrong with channel:[id: 0x781f97b1, /192.168.44.129:52834 => /192.168.44.129:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

canal-python示例代码问题反馈

https://github.com/bithaolee/canal-python 最后的代码示例 问题修改反馈

  1. 只打印了最后一个字段 format_data每次都重新赋值而不是追加
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
    for column in row.beforeColumns:
        format_data = {
            column.name: column.value
        }
elif event_type == EntryProtocol_pb2.EventType.INSERT:
    for column in row.afterColumns:
        format_data = {
            column.name: column.value
        }

==>

format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
    for column in row.beforeColumns:
        format_data[column.name] = column.value                    
elif event_type == EntryProtocol_pb2.EventType.INSERT:
    for column in row.afterColumns:
        format_data[column.name] = column.value
  1. before和after内容一样
format_data['before'] = format_data['after'] = dict()
==>
format_data['before'] = dict()
format_data['after'] = dict()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.