Comments (10)
@wuchong @PatrickRen 麻烦解答下?谢谢~
from flink-cdc-connectors.
FlinkDatabaseHistory
存储的是 schema 变更,不是 records 数据,不会那么大。 不过这块确实可以想办法优化一下,换成非内存模式。
from flink-cdc-connectors.
FlinkDatabaseHistory
存储的是 schema 变更,不是 records 数据,不会那么大。 不过这块确实可以想办法优化一下,换成非内存模式。
@wuchong 谢谢jark,对Debezium还不是很了解,我再去补补课。
感觉这个connector很神奇,感觉比接kafka方便很多,我试用了下这个cdc-connector,实验环境里mysql两个表join打成一个宽表,看了下实验效果确实是会把历史数据与实时变更的数据都能打到宽表里。但看代码,在原理上有点没想通,为啥历史数据也会被join到,难道是我的实验环境里binlog是有全部的历史行数据和历史ddl变更导致的?
请问下:
1、这个connector在功能上就是支持历史存量数据join操作,不依赖binlog里存在全部的历史行数据变更吗(因为生产环境里binlog里不存储所有历史数据变更比较常见)?
2、如果能做,想问下,他们全量的历史数据(启动的时候,mysql表里的所有数据快照,select * from table)是怎么保存的或者说全量的历史数据怎么能输入到这个cdc conector里的,并能参与到后续的flink sql join操作里的。
问这个两个的主要原因是我的场景里,锁拿不到,业务表数据量也比较大。
1、这个cdc connector需要一个全局锁,目前在生产环境中应用的时候,锁控制的很严格,拿不到全局读锁或表锁。
2、历史全量表会比较大,并且还在不停的增长,如果fail over的时候,存量历史数据是从哪里恢复过来的,从而还能保证正确的join结果输出?
所以想着能不能把存量数据与实时变更数据结合起来,想看看能不能改一下?(我们的场景可以不需要精确一次的语义的)
再次麻烦jark了~
from flink-cdc-connectors.
- 因为这个 connector 会先扫描全表读历史数据,然后再切换到对应的binlog 位点读增量的更新数据。
- source 不用保存啊,直接下发给下游就行拉。
- 拿不到全局读锁是因为 DBA 不肯给么?
- 如果再读历史全量的过程中 fail 了,那么恢复的时候会重新读历史全量。也就是说读历史全量过程中不会做 checkpoint。
from flink-cdc-connectors.
- 因为这个 connector 会先扫描全表读历史数据,然后再切换到对应的binlog 位点读增量的更新数据。
- source 不用保存啊,直接下发给下游就行拉。
- 拿不到全局读锁是因为 DBA 不肯给么?
- 如果再读历史全量的过程中 fail 了,那么恢复的时候会重新读历史全量。也就是说读历史全量过程中不会做 checkpoint。
1、锁权限(表锁或全局锁)dba有一套规范,不肯给。我感觉我们的场景下,表的schema基本不会做变更(频率很低,并且只要做变更都要做通知),应该也是可以直接跳过去获取锁??
2、结合你的讲解,带着疑问去看代码,理解又更深了一点。看这个 cdc connector的source是核心是通过debezium connector的engine来获取历史数据与binlog增量数据的,source是先emitRecords历史全量数据,再发emitRecords增量数据发送至下游。全量的过程,结合flink打的日志,DebeziumEngine 是有一个debezium/connector/mysql/SnapshotReader.java 获取锁,做表的扫描,每10000行还会输出下时间消耗。
这里还有一个关于内存不懂的地方,当数据往事下游流的时候,基本上,就是过flink的各种算子或者是写的sql转成的算子了进行运算了。像我们这种几个表做join的打成宽表的场景,例如
(
// 输入表test
CREATE TABLE test (
idINT,
nameVARCHAR(255),
timeTIMESTAMP(3),
status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_ask',
'table-name' = 'test'
);
// 输入表status
CREATE TABLE status (
id
INT,
name
VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_task',
'table-name' = 'status'
);
// 输出宽表test_status
CREATE TABLE test_status (
id
INT,
name
VARCHAR(255),
time
TIMESTAMP(3),
status
INT,
status_name
VARCHAR(255)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/ai_task',
'connector.table' = 'test_status',
'connector.username' = 'root',
'connector.password' = '1',
'connector.write.flush.max-rows' = '1'
);
// 计算出宽表
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
`
),
想问下jack, 数据往flink计算引擎流入的过程中,会做join打成宽表的逻辑,这里做join就会依赖的数据,比如status 一个id的name变化了,test_status相应的会有关联N条数据变化,这个join过程关联的N条test表数据是会怎么在flink计算引擎里保存的?
量大的时候会不会有内存不足的问题?不理解flink cdc sql语句和这些数据的保存之间会有什么关系,数据保存是依赖于flink 的statebackend吗?
@wuchong 望帮忙解答下,有没有哪方面的资源可以加深下理解的~
from flink-cdc-connectors.
- 如果表 schema 不做变更,那可以尝试跳过拿锁阶段。
- join 算子会用 state 来保存输入流的数据,生产上一般会用 rocksdb statebackend,所以不会有内存 OOM 的问题。 flink 提供的 cdc connector 其实只是去接了 binlog 的数据,和 state 没有什么关系,source 也只是记一个位点,不会存数据。就是是普通的数据流 join 也是会有 state 的。
from flink-cdc-connectors.
- 如果表 schema 不做变更,那可以尝试跳过拿锁阶段。
- join 算子会用 state 来保存输入流的数据,生产上一般会用 rocksdb statebackend,所以不会有内存 OOM 的问题。 flink 提供的 cdc connector 其实只是去接了 binlog 的数据,和 state 没有什么关系,source 也只是记一个位点,不会存数据。就是是普通的数据流 join 也是会有 state 的。
好的,结合你的讲解,再看看代码,感觉理解更深入了一步。整理下,拿上面的宽表场景,flink cdc join操作来看,串起来后,应该是这样?
1、cdc connector 先将存量数据发完(scan存量数据期间持有checkpoint操作需要的锁,所以无法做checkpoint动作,完后再释放锁),开始发送增量数据时后,才开始cdc connector开始checkpoint binlog的分区与offsize。当在发送增量数据阶段,job失败重启,cdc connector从checkpoint点重启,只消费增量数据,不会再scan全量数据了;join算子从state checkpoint中拿到join的中间表结果,继续恢复计算。join算子里的state大小会跟表的数据量级相关,但用了rocksdb,数据都在硬盘上,没OOM问题。
2、若cdc connector 没有scan完,scan部分存量数据就失败了,因为没有checkpoint,所以任务重启的时候会重新scan数据。
3、锁问题可以通过配置配debezium参数跳过。 debezium 有个开关,MySqlConnectorConfig.SnapshotLockingMode.NONE
if (!snapshotLockingMode.equals(MySqlConnectorConfig.SnapshotLockingMode.NONE) && useGlobalLock) { //Snapshot 锁获取 …… }
@wuchong jark,麻烦看看正确不?
另外目前试了下,cdc connector 在scan存量数据时好像效率有点低,4百万的存量数据,做这个宽表计算, 然后再存sql,实测要1个多小时。感觉有点慢,这里好像无解,看cdc connector 依赖的debezium单线程在读全表??
jark有啥优化思路或用法上的建议提升效率不?
from flink-cdc-connectors.
- 正确。
- 正确。
- 是的。你可以在 DDL 直接通过 'debezium.snapshot.mode' = 'never' 参数就可以控制,见文档。
- 这个应该是你下游效率太低,导致 source 被反压了,你可以看看下游为什么这么慢? 并发低?磁盘性能不行?
from flink-cdc-connectors.
- 正确。
- 正确。
- 是的。你可以在 DDL 直接通过 'debezium.snapshot.mode' = 'never' 参数就可以控制,见文档。
- 这个应该是你下游效率太低,导致 source 被反压了,你可以看看下游为什么这么慢? 并发低?磁盘性能不行?
关于4, 用blackhole验证试了下,确实快了很多。在这个测试场景里,应该是下游慢反压了。
CREATE TABLE test_status (
id
INT,
name
VARCHAR(255),
time
TIMESTAMP(3),
status
INT,
status_name
VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'blackhole'
);
但总感觉这里的全量scan操作要是能并行就好了,就像jdbc connector一样,能依据某个column设置并行扫描('scan.partition.column' = '%s)会更快些~~
要是能并行描,可以加速下job上线时间。
from flink-cdc-connectors.
这里主要是要保证 snapshot 和 binlog offset 的一致性,所以只能单并发运行 :(
from flink-cdc-connectors.
Related Issues (20)
- flinksql设置并发后,会报错Database connection failed when starting copy(postgres-cdc) HOT 2
- Error due to Flink CDC using deprecated `flink-conf.yaml` in light of Flink's full YAML 1.2 support since version 1.19 HOT 2
- The XML dependency of Flink Oracle CDC conflicts with the XML related classes of Java8:ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException HOT 3
- 如何通过yarn-session运行flink cdc作业?
- Oracle problem,sync fail HOT 1
- flink mongodb cdc 方式同步数据,mongodb 4.2版本的不支持,update的更新数据同步吗 HOT 4
- I found flink-cdc supported connectors using YAML definition,its very convenient,i love it.so my question is when other pipeline is supported?for example,i want sync data form oracle to pg or pg to mysql.Thanks! HOT 1
- Flink Mongodb CDC在遇到DDL操作(eg: db.c.drop()),程序报错, 期待相应功能的支持 HOT 1
- SQL Server does not support scan. startup. mode: initial only HOT 2
- flink cdc 2.4.2版本关于报错io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4 的问题 HOT 1
- Exception in thread "main" java.lang.NoSuchMethodError: 'void org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.<init>(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink, boolean, boolean)' HOT 3
- Information that has disappeared from github appears on the web page. HOT 1
- DataStreamSource oracle ID HOT 1
- Information that has disappeared from github appears on the web page. HOT 2
- supported mysql8.4? HOT 1
- Flink 1.18.1 and 1.19.0 having build jdk 1.8 and causing incompatibilities with Java 17 HOT 3
- update operation does not take effect HOT 5
- mysql pipeline connector lost table which database and table with the same name
- Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.doris.flink.cfg.DorisOptions to field org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier.dorisOptions of type org.apache.doris.flink.cfg.DorisOptions in instance of org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier HOT 1
- flink-cdc sql sqlserver HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from flink-cdc-connectors.