bithaolee / canal-python Goto Github PK
View Code? Open in Web Editor NEWalibaba canal 客户端(Python3 版本)
alibaba canal 客户端(Python3 版本)
format_data['before']=format_data['after'] = dict() 改为
format_data['before'], format_data['after'] = dict(), dict()
我在查看修改数据库记录前后端的变化,发现修改前后字段的内容都一样,吓我一跳。呵呵。
def check_valid(self, username=b'', password=b''):
测试10次update,有时候只能同步到1,2次,有时候能同步10次
不同的client_id, 不能做到分别消费对应的数据。
比如:client_id = b"1001" 消费了数据
client_id = b"1002"就无法继续消费数据了
1、 # 原代码连等式会造成before 和 after的值全部都是after的值,需分别创建字典。
format_data['before'] = dict()
format_data['after'] = dict()
'''
format_data['before'] = format_data['after'] = dict()
'''
2、 if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
# 原代码只能记录记录的最后一个字段,需做一下更改,这样可以显示所有字段
format_data[column.name] = column.value
'''
format_data = {
column.name: column.value
}
'''
elif packet.type == CanalProtocol_pb2.PacketType.ACK:
ack = CanalProtocol_pb2.PacketType.Ack()
ack.MergeFromString(packet.body)
if ack.error_code > 0:
raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))
CanalProtocol_pb2.PacketType.Ack()是一个枚举?即使是枚举也不能这么写吧。此处ACK回包,应该是返回 CanalProtocol_pb2.Ack()
elif packet.type == CanalProtocol_pb2.PacketType.ACK:
ack = CanalProtocol_pb2.Ack()
ack.MergeFromString(packet.body)
if ack.error_code > 0:
raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))
否则,运行有可能报错,至于为何不是必现的错误,我也不解,不知是否是bytes流的问题。
/canal/client.py
id(format_data['before']) == id(format_data['after']) is true
should be : format_data['before'],format_data['after'] = dict(), dict()
canal的standby节点启动canal python client时报错,主节点可以正常启动并消费。
Exception: Failed to subscribe. error code:400, error message:something goes wrong with channel:[id: 0x781f97b1, /192.168.44.129:52834 => /192.168.44.129:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
{'db': 'pex', 'table': 'a', 'event_type': 2, 'data': {'before': {'id': '2', 'xa': '2'}, 'after': {'id': '2', 'xa': '2'}}},
after & before 一模一样
https://github.com/bithaolee/canal-python 最后的代码示例 问题修改反馈
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
==>
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data[column.name] = column.value
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data[column.name] = column.value
format_data['before'] = format_data['after'] = dict()
==>
format_data['before'] = dict()
format_data['after'] = dict()
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\.(name1|name2)')
filter中的没起作用
完整日志:
connected to 127.0.0.1:11111
Traceback (most recent call last):
File "canal_client.py", line 10, in <module>
client.check_valid(username=b'canal', password=b'canal')
File "/usr/local/lib/python3.6/site-packages/canal/client.py", line 37, in check_valid
data = self.connector.read_next_packet()
File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 41, in read_next_packet
data = self.read(self.packet_len)
File "/usr/local/lib/python3.6/site-packages/canal/connector.py", line 29, in read
raise Exception('TSocket: Could not read bytes from server')
Exception: TSocket: Could not read bytes from server
canal1.4如果使用了canal-admin,即使在canal_local.properties 里不配置用户密码,客户端访问canal-server时也会要求校验,但是canal-python缺少这个功能,无法通过验证
你好,看你的例子里,消息回执都没有写,写的也是可选。
请问如果出现有消息没有同步到,没有消费那么下次怎么取到未消费的消息?
这个报错有看过吗
In [48]: EntryProtocol_pb2.EventType.items()
Out[48]:
[('EVENTTYPECOMPATIBLEPROTO2', 0),
('INSERT', 1),
('UPDATE', 2),
('DELETE', 3),
('CREATE', 4),
('ALTER', 5),
('ERASE', 6),
('QUERY', 7),
('TRUNCATE', 8),
('RENAME', 9),
('CINDEX', 10),
('DINDEX', 11),
('GTID', 12),
('XACOMMIT', 13),
('XAROLLBACK', 14),
('MHEARTBEAT', 15)]
BEGIN和COMMIT的header.eventType都是0, 也就是EntryProtocol_pb2.EventType.Name(0)='EVENTTYPECOMPATIBLEPROTO2'
无法区分begin和commit.
还有一个问题是发现经常获取不到COMMIT, 除非COMMIT比事务中最后一个语句晚6秒执行
第28行、第33行 row.beforeColumns、row.afterColumns中包含多个字段的修改
下面遍历语句中对format_data的操作是 format_data = {column.name: column.value} ,这样format_data仅仅会保留row.afterColumns中最后一个字段的信息
建议改成format_data.setdefault(column.name, column.value)
客户端中,不能同步处理好mysql源表的blog类型的字符串,会有乱码的发生。乱码只产生于 中文字符,英文字符没有问题。
canal版本为1.1.14
canal-python pip客户端为0.4版本
复原的步骤为:
create table test_blog ( id int not null primary key auto_increment, detail blob not null );
insert into test_blog (detail) values("测试");
此时同步这条数据就是乱码。
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
format_data = {column.name: column.value} 只会保留最后一个column的数据,
应该是format_data[column.name] = column.value
直接运行example.py,重启canal的server端之后,上次消费完ack之后的数据还会再次被消费
format_data['before'] = format_data['after'] = dict()这句表示format_data['before']与format_data['after'] 指向的是同一个地址,修改format_data['before']时format_data['after'] 也会被修改,最终导致无法在mysql row模式下同时取到update前后的数据取值。
建议把第41行修改为:
format_data['before'] = dict()
format_data['after'] = dict()
版本:canal 1.1.4
client.check_valid(username=b'', password=b'') 这里的username 和 password我不清楚应该填写哪个。
经过测试,如果canal server用户非local方式启动是可以正常运行,local方式启动的会有问题。
Traceback (most recent call last):
File "/Users/admin/Desktop/project/dataCollection/canal.py", line 13, in
message = client.get()
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 85, in get
message = self.get_without_ack(size)
File "/Users/admin/Desktop/project/dataCollection/canal/client.py", line 119, in get_without_ack
ack = CanalProtocol_pb2.PacketType.Ack()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/google/protobuf/internal/enum_type_wrapper.py", line 95, in getattr
raise AttributeError
AttributeError
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.