Giter Site home page Giter Site logo

canal-python's Issues

demo有个bug,

format_data['before']=format_data['after'] = dict() 改为
format_data['before'], format_data['after'] = dict(), dict()

我在查看修改数据库记录前后端的变化,发现修改前后字段的内容都一样,吓我一跳。呵呵。

demo 几个调整建议

1、 # 原代码连等式会造成before 和 after的值全部都是after的值,需分别创建字典。
format_data['before'] = dict()
format_data['after'] = dict()
'''
format_data['before'] = format_data['after'] = dict()
'''
2、 if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''

ack with error protobuf object, (ACK 的包写错了对象)

源代码:https://github.com/haozi3156666/canal-python/blob/68ecec0795ef712c0c00d001fccb39215cfbc3dc/canal/client.py#L116

        elif packet.type == CanalProtocol_pb2.PacketType.ACK:
            ack = CanalProtocol_pb2.PacketType.Ack()
            ack.MergeFromString(packet.body)
            if ack.error_code > 0:
                raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))

CanalProtocol_pb2.PacketType.Ack()是一个枚举?即使是枚举也不能这么写吧。此处ACK回包,应该是返回 CanalProtocol_pb2.Ack()

        elif packet.type == CanalProtocol_pb2.PacketType.ACK:
            ack = CanalProtocol_pb2.Ack()
            ack.MergeFromString(packet.body)
            if ack.error_code > 0:
                raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))

否则,运行有可能报错,至于为何不是必现的错误,我也不解,不知是否是bytes流的问题。

client demo 错误

/canal/client.py
id(format_data['before']) == id(format_data['after']) is true

should be : format_data['before'],format_data['after'] = dict(), dict()

canal python client HA

canal的standby节点启动canal python client时报错,主节点可以正常启动并消费。
Exception: Failed to subscribe. error code:400, error message:something goes wrong with channel:[id: 0x781f97b1, /192.168.44.129:52834 => /192.168.44.129:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

canal-python示例代码问题反馈

https://github.com/bithaolee/canal-python 最后的代码示例 问题修改反馈

  1. 只打印了最后一个字段 format_data每次都重新赋值而不是追加
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
    for column in row.beforeColumns:
        format_data = {
            column.name: column.value
        }
elif event_type == EntryProtocol_pb2.EventType.INSERT:
    for column in row.afterColumns:
        format_data = {
            column.name: column.value
        }

==>

format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
    for column in row.beforeColumns:
        format_data[column.name] = column.value                    
elif event_type == EntryProtocol_pb2.EventType.INSERT:
    for column in row.afterColumns:
        format_data[column.name] = column.value
  1. before和after内容一样
format_data['before'] = format_data['after'] = dict()
==>
format_data['before'] = dict()
format_data['after'] = dict()

连接失败

完整日志:

connected to 127.0.0.1:11111
Traceback (most recent call last):
  File "canal_client.py", line 10, in <module>
    client.check_valid(username=b'canal', password=b'canal')
  File "/usr/local/lib/python3.6/site-packages/canal/client.py", line 37, in check_valid
    data = self.connector.read_next_packet()
  File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 41, in read_next_packet
    data = self.read(self.packet_len)
  File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 29, in read
    raise Exception('TSocket: Could not read bytes from server')
Exception: TSocket: Could not read bytes from server

不能区分BEGIN ,COMMIT

In [48]: EntryProtocol_pb2.EventType.items()                                                                                                                                                                                                                    
Out[48]: 
[('EVENTTYPECOMPATIBLEPROTO2', 0),
 ('INSERT', 1),
 ('UPDATE', 2),
 ('DELETE', 3),
 ('CREATE', 4),
 ('ALTER', 5),
 ('ERASE', 6),
 ('QUERY', 7),
 ('TRUNCATE', 8),
 ('RENAME', 9),
 ('CINDEX', 10),
 ('DINDEX', 11),
 ('GTID', 12),
 ('XACOMMIT', 13),
 ('XAROLLBACK', 14),
 ('MHEARTBEAT', 15)]

BEGIN和COMMIT的header.eventType都是0, 也就是EntryProtocol_pb2.EventType.Name(0)='EVENTTYPECOMPATIBLEPROTO2'
无法区分begin和commit.

还有一个问题是发现经常获取不到COMMIT, 除非COMMIT比事务中最后一个语句晚6秒执行

event_type为delete、insert时format_data不完全

第28行、第33行 row.beforeColumns、row.afterColumns中包含多个字段的修改
下面遍历语句中对format_data的操作是 format_data = {column.name: column.value} ,这样format_data仅仅会保留row.afterColumns中最后一个字段的信息
建议改成format_data.setdefault(column.name, column.value)

同步blob的字符串中文乱码

客户端中,不能同步处理好mysql源表的blog类型的字符串,会有乱码的发生。乱码只产生于 中文字符,英文字符没有问题。
canal版本为1.1.14
canal-python pip客户端为0.4版本

复原的步骤为:

create table test_blog ( id int not null primary key auto_increment, detail blob not null );

insert into test_blog (detail) values("测试");

此时同步这条数据就是乱码。

建立与Canal的连接代码bug

if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}

elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}

format_data = {column.name: column.value} 只会保留最后一个column的数据,
应该是format_data[column.name] = column.value

example.py中第41行存在BUG

format_data['before'] = format_data['after'] = dict()这句表示format_data['before']与format_data['after'] 指向的是同一个地址,修改format_data['before']时format_data['after'] 也会被修改,最终导致无法在mysql row模式下同时取到update前后的数据取值。
建议把第41行修改为:
format_data['before'] = dict()
format_data['after'] = dict()

client.get()报错

Traceback (most recent call last):
File "/Users/admin/Desktop/project/dataCollection/canal.py", line 13, in
message = client.get()
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 85, in get
message = self.get_without_ack(size)
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 119, in get_without_ack
ack = CanalProtocol_pb2.PacketType.Ack()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/google/protobuf/internal/enum_type_wrapper.py", line 95, in getattr
raise AttributeError
AttributeError

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.