Giter Site home page Giter Site logo

itinycheng / flink-connector-clickhouse Goto Github PK

View Code? Open in Web Editor NEW
341.0 10.0 145.0 275 KB

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.

License: Apache License 2.0

Java 92.61% Shell 7.39%
clickhouse flink connector flink-connector

flink-connector-clickhouse's Introduction

flink-connector-clickhouse's People

Contributors

czy006 avatar itinycheng avatar liyubin117 avatar lxorc avatar lyfee avatar snuyanzin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-connector-clickhouse's Issues

当使用fiexdstring时数据写入失败

当向一个fixedstring类型的ck字段写入超出长度的字符串时,任务显示success状态,没有任何异常,但是ck表中没有任何数据,整批数据都不存在。
请问这是合理的吗?
image
官网说会抛出 Too large value for FixedString(N) exception 才对。

sink.flush-interval不生效

设置
'sink.batch-size' = '100',
'sink.flush-interval' = '5000'
数据量较小,160条数据,只sink了部分数据,即使过了sink.flush-interval这个周期,也不会把剩余数据sink进去

sink.ignore-delete

sink.ignore-delete =false
issues java.lang.UnsupportedOperationException: Please use prepareStatement(ClickHouseConnection connection) instead.
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.prepareStatement(ClickHouseUpsertExecutor.java:63)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:63)
... 13 more

e.displayText() = DB::Exception: Mutations are not supported by storage Distributed

How to solve the ClickHouse distributed write problem?
The following error occurs when I use a regular statement:
Code: 48, e.displayText() = DB::Exception: There was an error on [chi-bpiih-clickhouse-gxqtcluster-0-0:9000]: Code: 48, e.displayText() = DB::Exception: Mutations are not supported by storage Distributed (version 21.1.3.32) (version 21.1.3.32)

at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeStatement(ClickHouseStatementImpl.java:255) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:593) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:388) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125) ~[flink-connector-clickhouse-1.14.3-SNAPSHOT.jar:?]

two table join in flink sink to clickhouse, update old data in mysql, it will delete this row in clickhouse and no insert

I want use flink cdc sql create a wide table(sink_ck) in clickhouse, when I create two or more then two flink cdc source table from mysql , such as tb1 ,tb2.
I submit to a job in flink-client :
insert into sink_ck SELECT a.*,b.order_name FROM tb1 a LEFT JOIN tb2 b ON a.order_id = b.order_id
when tb1 row datas is update in mysql , the row will be delete in sink_ck , why ?
I want tb1 row datas is update in mysql ,this row datas same update in clikchouse.

one table use flink cdc sink to clickhouse , insert \ update\ delete is No problem!

one table update is ok, message :
INSERT INTO ck_order(id, order_id, merchant_code, merchant_name, create_time, update_time) FORMAT TabSeparated
INSERT INTO ck_order(id, order_id, merchant_code, merchant_name, create_time, update_time) FORMAT TabSeparated
ALTER TABLE flink.ck_order UPDATE order_id='20042100002', merchant_code='BU0000000002YGP', merchant_name='广州2贸易有限公司_2', create_time='2021-04-21 10:41:22', update_time='2022-04-25 10:33:58' WHERE id=2
ALTER TABLE flink.ck_order UPDATE order_id='20042100002', merchant_code='BU0000000002YGP', merchant_name='广州2贸易有限公司_2', create_time='2021-04-21 10:41:22', update_time='2022-04-25 10:33:58' WHERE id=2

wide table update ,it is delete, no alter table update:
INSERT INTO ck_order_detail(id, order_id, merchant_code, merchant_name, create_time, update_time, order_name) FORMAT TabSeparated
INSERT INTO ck_order_detail(id, order_id, merchant_code, merchant_name, create_time, update_time, order_name) FORMAT TabSeparated
ALTER TABLE flink.ck_order_detail DELETE WHERE id=2
ALTER TABLE flink.ck_order_detail DELETE WHERE id=2

CDC日志(更新操作)实时写入clickhouse,存在数据错误

假如有以下链路:
mysql-cdc(debezium-json) -> clickhouse

debezium-json会将cdc变更日志中的更新操作,转化为2条操作(-Delete +Insert)

当这个事件流通过flink-clickhouse-connector实时写入clickhouse时,因为-Delete事件触发的删除操作是异步的,会导致执行顺序错乱,比如会先执行+Insert操作,再执行-Delete操作,
导致这种场景下会出现数据问题,表现在:当有更新操作时,最终体现在clickhouse会表现为删除操作。

batch写入超过3次报错

你好,我在写入的时候,流任务跑了1个小时后,发现batch 刷新数据有问题,重试3次都失败了。下面是日志:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$149.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 9 more

多并行度读取重复数据问题

你好,下载代码测试,发现Source在不设置scan.partition.*等参数时,多并行度下,读取了N份数据出来。能如何解决这个问题呢?谢谢!

Maven Dependency

I want use it,but Maven Dependency not found.Maven repository url is ?

数据丢失

create table ods_mall_info (
id BIGINT,
name STRING,
shortName STRING,
mallType TINYINT,
phone STRING,
provinceCode STRING,
cityCode STRING,
countyCode STRING,
address STRING,
longitude Decimal(18,6),
latitude Decimal(18,6),
state STRING,
createTime TIMESTAMP,
createBy BIGINT,
image STRING,
codeImageCreateTime TIMESTAMP,
codeImageState STRING,
codeImage STRING,
documentType STRING,
socialCreditCode STRING,
licenseEffectiveDate STRING,
businessScope STRING,
ICPNumber STRING,
webAddress STRING,
registeredCaptial BIGINT,
paidCaptial BIGINT,
foundedDate STRING,
industry STRING,
registeredAddress STRING,
businessAddress STRING,
socialCreditImage STRING,
corporateName STRING,
corporatePhone STRING,
corporateDocumentType STRING,
corporateDocumentNumber STRING,
documentStartEndDate STRING,
documentFrontImage STRING,
documentBackImage STRING,
oms_code STRING,
coefficient STRING,
updateTime TIMESTAMP,
updateBy BIGINT,
delFlag STRING,
codeImageRecharge STRING,
mallDistinction STRING,
showDefaultPic TINYINT,
mall_dis STRING,
real_business TINYINT
) WITH (
'connector' ='clickhouse',
'url' ='clickhouse://:8123',
'database-name' ='hxhdbtest',
'table-name' = 'ods_mall_info',
'username' = 'default',
'password' = ''
);
create table ods_business_lymerch_detail
(
id BIGINT,
mall_guid String,
mall_mdmid String,
mall_class String,
mall_code String,
mall_status String,
mall_type String,
address String,
area String,
bi_code String,
b_reg_code String,
b_reg_name String,
city_code String,
city_name String,
consumer_hotline String,
country_code String,
country_name String,
full_name String,
gd_gps String,
gps String,
is_open_up TINYINT,
is_up_app TINYINT,
is_up_ly TINYINT,
is_up_sap TINYINT,
oms_code String,
open_date String,
operation_name String,
post_code String,
province_code String,
prv_name String,
ps_code String,
short_name String,
s_reg_code String,
telephone String,
composite_merch String,
del_flag TINYINT,
create_date_time timestamp
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'ods_business_lymerch_detail',
'username' = 'default',
'password' = ''
);
CREATE TABLE ods_mall_city
(
Region_Identifier string,
Chinese_Abbreviation string,
Area_Code string,
Full_Name_Pinyin string,
Full_Name string,
Area_Name string,
Superior_Area_Code string,
Pinyin string,
AreaType string
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'ods_mall_city',
'username' = 'default',
'password' = ''
);
CREATE TABLE dim_mall
(
hxh_mall_id String,
hxh_mall_name String,
hxh_mall_short_name String,
mall_address String,
mall_type INT,
mall_kind String,
mall_area String,
open_date String,
oms_code String,
b_reg_code String,
b_reg_name String,
bi_code String,
ps_code String,
province_code String,
province_name String,
city_code String,
city_name String,
country_code String,
country_name String,
consumer_hotline String,
mall_longitude Decimal(18, 6),
mall_latitude Decimal(18, 6),
city_longitude Decimal(18, 6),
city_latitude Decimal(18, 6),
gd_gps String,
tecent_gps String,
hxh_mall_status String,
mall_industry String,
mall_distinction String,
ly_mall_id String,
ly_mall_guid String,
ly_mall_mdmid String,
ly_mall_code String,
ly_mall_class String,
ly_mall_name String,
state String,
ori_create_time TIMESTAMP(0),
ori_update_time TIMESTAMP(0),
dim_create_date TIMESTAMP(0),
PRIMARY KEY (hxh_mall_id) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://:8123',
'database-name' = 'hxhdbtest',
'table-name' = 'dim_mall',
'username' = 'default',
'password' = ''
);

INSERT
INTO
dim_mall
SELECT
cast(mi.id AS VARCHAR),
-- 商场代码
mi.name AS hxh_mall_name,
-- 商场名称
mi.shortName AS hxh_mall_short_name,
-- 商场简称
bld.address AS mall_address,
-- 地址
mi.mallType AS mall_type,
-- 商场类型
bld.mall_type AS mall_kind,
-- 商场类别
bld.area AS mall_area,
-- 商场面积
bld.open_date AS open_date,
-- 开业日期
mi.oms_code AS oms_cod,
-- 龙翼商场关联码
bld.b_reg_code AS b_reg_code,
-- 营发中心编号
bld.b_reg_name AS b_reg_name,
-- 营发中心名称
bld.bi_code AS bi_code,
-- BI代码
bld.ps_code AS ps_code,
-- PS代码
mi.provinceCode AS province_code,
-- 省代码
oc_province.Area_Name AS province_name,
-- 省名称
mi.cityCode AS city_code,
-- 城市编码
oc_city.Area_Name AS city_name,
-- 城市名称
mi.countyCode AS country_code,
-- 市区代码
oc_county.Area_Name AS country_name,
-- 市辖区名
bld.consumer_hotline AS consumer_hotline,
-- 消费热线
mi.longitude AS longitude,
-- 商场经度
mi.latitude AS latitude,
0 AS mall_longitude,
0 AS mall_latitude,
-- 商场纬度
bld.gd_gps AS gd_gps,
-- 高德地图坐标
bld.gps AS tecent_gps,
-- 腾讯地图坐标
mi.state AS hxh_mall_status,
-- 商场状态
mi.industry AS mall_industry,
-- 所属行业
mi.mallDistinction AS mall_distinction,
-- 商场标识 1-显形商场 2-隐形商场
cast(bld.id AS VARCHAR) AS ly_mall_id,
-- 龙翼商场代码
bld.mall_guid AS ly_mall_guid,
-- 龙翼商场guid
bld.mall_mdmid AS ly_mall_mdmid,
-- 龙翼商场mdmid
bld.mall_code AS ly_mall_code,
-- 龙翼商场编号
bld.mall_class AS ly_mall_class,
-- 龙翼商场名字
bld.full_name as ly_mall_name,
-- 商户进件状态(1:可用)
bld.mall_status,
mi.createTime AS business_create_date,
mi.updateTime AS update_create_date,
now()
FROM
ods_mall_info mi
LEFT JOIN (
SELECT
*
FROM
ods_business_lymerch_detail
WHERE
del_flag = 0
) bld ON
mi.oms_code = bld.oms_code
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '2'
) oc_province ON
oc_province.Area_Code = mi.provinceCode
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '3'
) oc_city ON
oc_city.Area_Code = mi.cityCode
LEFT JOIN (
SELECT
Area_Code,
Area_Name
FROM
ods_mall_city
WHERE
AreaType = '4'
) oc_county ON
oc_county.Area_Code = mi.countyCode;

上面的是原始SQL,left join之后的数量确实变少了,ods_mall_info有40条数据,而ods_business_lymerch_detail只有11条数据,最终得到的是29条(除了右表的11条数据)。如果insert表改成 connector='print',能够明显看到有那11条数据的+ - + 的三步操作。

当我在insert表上增加sink.batch-size = ‘1’的时候,数据没丢失

Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3

at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:259)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:244)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:162)
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:444)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at com.ververica.cdc.connectors.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
at com.ververica.cdc.connectors.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:94)
at com.ververica.cdc.connectors.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at com.ververica.cdc.connectors.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)

The clickhouse exception stack can only be viewed in the taskmanager log

执行sql code

ClickHouseExecutor#attemptExecuteBatch

    default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries)
            throws SQLException {
        for (int i = 0; i < maxRetries; i++) {
            try {
                stmt.executeBatch();
                return;
            } catch (Exception exception) {
                LOG.error("ClickHouse executeBatch error, retry times = {}", i, exception);
                try {
                    Thread.sleep(1000L * i);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new SQLException(
                            "Unable to flush; interrupted while doing another attempt", ex);
                }
            }
        }
        throw new SQLException(
                String.format(
                        "Attempt to execute batch failed, exhausted retry times = %d", maxRetries));
    }

flink ui exceptions 中没有具体报错信息

从报错日志我们只能查看到重试三次,同时任务会一直内部重启,从业务角度来看,他们一般只会去看 flink ui excetion。
image

查看taskmanager.log

假如运行在 yarn上,作业挂了,jobmanager 日志还查看不到报错信息。
image

mysql cdc update Exception: There was an error on : Cannot execute replicated DDL query on leader

update data, Distributed table use-local=true have errors, but local table no errors
insert data , Distributed table use-local=true and local table also no errors

e.displayText() = DB::Exception: There was an error on Cannot execute replicated DDL query on leader

at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29)
at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094)
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeStatement(ClickHouseStatementImpl.java:255)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:593)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:388)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.flush(ClickHouseShardOutputFormat.java:160)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.flush(ClickHouseShardOutputFormat.java:154)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

dialect

请问CLICKHOUSE 有dialect吗

mysql--> flink-1.13.5 bin/sql-client.sh ->clickhouse

mysql: connector' = 'mysql-cdc'
clickhouse: your package

CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.20.187',
'port' = '3309',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);

CREATE TABLE clickhouseorders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://192.168.20.187:8123',
'database-name' = 'test',
'table-name' = 'clickhouseorders',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
3.
INSERT INTO clickhouseorders
SELECT o.order_id,o.order_date, o.customer_name, o.price
FROM orders AS o;

result:
2022-02-14 14:42:44
java.lang.NoClassDefFoundError: org/apache/flink/util/concurrent/ExecutorThreadFactory
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.scheduledFlush(AbstractClickHouseOutputFormat.java:52)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:69)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

what should I do next? thanks

use options use-local:Unable to establish connection to ClickHouse

sql:
CREATE TABLE sink_table (

 ID String,

 NAME String,

 PRIMARY KEY (ID) NOT ENFORCED) WITH (

'connector' = 'clickhouse',

'sink.batch-size' =  '500', 

'sink.ignore-delete' =  'false', 

'sink.flush-interval' =  '1000', 

'sink.max-retries' =  '300', 
'use-local' =  'true', 
'url' =  'clickhouse://127.0.0.1:8123', 
'database-name' =  'default', 

'username' =  'clickhouse', 

'password' =  'clickhouse', 

'table-name' =  'demo' )

error:
java.io.IOException: Unable to establish connection to ClickHouse
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:106)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: 127.0.0.1, port: 9000; Magic is not correct: 60
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:56)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:25)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:351)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:324)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:319)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:314)
at ru.yandex.clickhouse.ClickHouseConnectionImpl.initConnection(ClickHouseConnectionImpl.java:91)
at ru.yandex.clickhouse.ClickHouseConnectionImpl.(ClickHouseConnectionImpl.java:78)
at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:62)
at ru.yandex.clickhouse.BalancedClickhouseDataSource.getConnection(BalancedClickhouseDataSource.java:195)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createConnection(ClickHouseConnectionProvider.java:129)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createAndStoreShardConnection(ClickHouseConnectionProvider.java:95)
at org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider.createShardConnections(ClickHouseConnectionProvider.java:82)
at org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:85)
... 14 more
Caused by: java.io.IOException: Magic is not correct: 60
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.readNextBlock(ClickHouseLZ4Stream.java:94)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.checkNext(ClickHouseLZ4Stream.java:75)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.read(ClickHouseLZ4Stream.java:51)
at ru.yandex.clickhouse.response.StreamSplitter.readFromStream(StreamSplitter.java:92)
at ru.yandex.clickhouse.response.StreamSplitter.next(StreamSplitter.java:53)
at ru.yandex.clickhouse.response.ClickHouseResultSet.(ClickHouseResultSet.java:95)
at ru.yandex.clickhouse.ClickHouseStatementImpl.createResultSet(ClickHouseStatementImpl.java:1121)
at ru.yandex.clickhouse.ClickHouseStatementImpl.updateResult(ClickHouseStatementImpl.java:224)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:344)
... 25 more

丢失数据

insert into select a left join b on ...
得到的数据,丢失了满足on条件的数据,只有不满足条件的数据

maven package not found javac

需要在pom添加如下引用。但不知道为啥,作者知道么
<dependency> <groupId>com.google.errorprone</groupId> <artifactId>javac</artifactId> <version>9+181-r4173-1</version> </dependency>

数据无法sink 到clickhouse

sqlserver sink Clickhouse 数据无法下沉 也没有报错,在ClickHouseBatchOutputFormat 类的 writeRecord 打上断点发现数据已经收到了,如果断点调试数据确能够sink ,不打断点数据就无法sink

java.lang.RuntimeException: Flush exception found.

Flink 版本 1.14.4 CDC版本2.2.1 Clickhouse jar包是最新的1.14.3分支的,单表同步过一阵子就会报java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:50)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:139)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$25(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more

mysql-cdc to clickhous update 报错

Exception in thread "main" org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at com.demo.mysql.v2.CDCMain.main(CDCMain.java:89)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:60)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:50)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$7.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:139)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 12 more
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
... 10 more
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 10 more

flink sql sink到clickhouse

你好,请问一下,flink sql sink到clickhouse中,clickhouse建表支持flink sql中ROW数据类型映射到clickhouse的JSON数据类型吗?

查询clickhouse数据

您好,请教一个问题哈,我可以实时查询clickhouse的数据吗?比如我现在有一个表,实时往里面写,然后另一个程序会实时查询这个表今天凌晨到目前为止的所有数据,比如2000万,然后在去和流的数据进行匹配?

hudi同步ck数据报错

1.报错Unable to create a sink for writing table 'default_catalog.default_database.ck'
2.报错java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;
3.maven无法拉取,改为手动下载1.12版本
4.flink版本为1.13.6

ck配置为:
'connector'='clickhouse'
'database-name'='XX'
'sink.batch-size'='500'
'sink.flush-interval'='1000'
'sink.max-retries'='3'
'table-name'='ck'
'url'='clickhouse://XX:9876'

希望得到帮助

It doesn't work with Flink 1.14.0

Got below error message.
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/runtime/util/ExecutorThreadFactory
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.scheduledFlush(AbstractClickHouseOutputFormat.java:59)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:68)

It seems org/apache/flink/runtime/util/ExecutorThreadFactory has been removed from Flink 1.14.0. Do you have a plan to upgrade to Flink 1.14.0?

大佬请教几个问题

  1. 假如 10 个并行度去写本地表。本地表有3个,一个并行度是去写一个表,还是轮询写这三个表?
  2. 假如服务端重启了,可以重试连副本节点继续写入?

org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor - ClickHouse executeBatch error

执行upsert 模式 报错,具体信息如下:
160123 [clickhouse-batch-output-format-thread-1] ERROR org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor - ClickHouse executeBatch error, retry times = 0
java.sql.SQLSyntaxErrorException: Query must be like 'INSERT INTO [db.]table [(c1, c2, c3)] VALUES (?, ?, ?)'. Got: ALTER TABLE default.order UPDATE userName=?, orderID=? WHERE orderName=?
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:327)
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:320)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:53)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

代码如下:
image

flink字段time,ck字段datetime,写入报错

CREATE TABLE source (
  `id`    int,
   name varchar,
   t   time
) WITH (
    'connector' = 'datagen'
);

CREATE TABLE sink (
`id` int,
name varchar,
  t    time ,
 primary key(id) not enforced
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:clickhouse://url.163.org:8123/',
   'database-name' = 'lyblocal',
  'username' = 'default',
  'password' = '',
   'table-name' = 'mt3',
   'sink.distribute-table-write-local' = 'false',
   'sink.ignore-delete' = 'false',
   'sink.partition-strategy' = 'hash',
   'sink.partition-key' = 'id',
   'sink.buffer-flush.max-rows' = '1'
);

insert into sink
select id,name,t  from source;

报错如下:
Column 0, name: id, type: Int32, parsed text: "-816858032"
Column 1, name: name, type: String, parsed text: "9fb989b731b7d0f32d66dee746308ee77eb311db99a220adc78817b43e23c36ae32975e02834ec1f54308fd10d7f60f8c2fb"
Column 2, name: t, type: DateTime, parsed text: "17"ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.
Code: 41, e.displayText() = DB::ParsingException: Cannot parse datetime

branchs 1.12 SQL validation failed

用的分支1.12,编译后放入工程里,执行sql,校验报错,最简单的sql,select name from student
1.13和1.14同样的代码执行无问题。
跟了下代码:SqlNode parsed = parser.parse(statement);这里出错了,parserImpl.class里
1.12有bug吗?

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
at com.big.data.app.test4.main(test4.java:37)
Caused by: java.lang.UnsupportedOperationException
at org.apache.flink.connector.clickhouse.catalog.ClickHouseCatalog.getTableStatistics(ClickHouseCatalog.java:452)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:117)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:104)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:81)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:77)
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3205)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3187)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

ClickHouse executeBatch error

when I use flink-connector-clickhouse.
image

The following exceptions occur occasionally

2022-05-09 12:26:29,777 ERROR org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat [] - ClickHouse executeBatch error, retry times = 0 ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: clickhouse-prod.xxx.com, port: 8123; null at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:42) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:25) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1071) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1026) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1019) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:381) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseBatchExecutor.executeBatch(ClickHouseBatchExecutor.java:72) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.attemptFlush(AbstractClickHouseOutputFormat.java:84) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:66) ~[flink-connector-clickhouse-1.12.0.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_322] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_322] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_322] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] Caused by: java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911) ~[?:1.8.0_322] at java.util.ArrayList$Itr.next(ArrayList.java:861) ~[?:1.8.0_322] at ru.yandex.clickhouse.ClickHousePreparedStatementImpl$BatchHttpEntity.writeTo(ClickHousePreparedStatementImpl.java:419) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl$WrappedHttpEntity.writeTo(ClickHouseStatementImpl.java:98) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:152) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[flink-connector-clickhouse-1.12.0.jar:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[flink-connector-clickhouse-1.12.0.jar:?] at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1059) ~[flink-connector-clickhouse-1.12.0.jar:?] ... 15 more

Unexpected additional rows in ClickHouse sink table for flink CDC procedure

Flink Version : 1.13.2
Flink CDC version: 2.2
flink-connector-clickhouse version : 1.13-release

  1. The workflow as below:
    MySQL (8.0.28) -> Flink CDC -> Flink -> ClickHouse.

(1) Configuration :
Use local table (engine is MergeTree) for clickhouse sink table, configuration as below:
" 'connector' = 'clickhouse'," +
" 'url' = 'clickhouse://clickhouse-server:8123',"+
" 'username' = 'user'"+
" 'password' = 'pass'"+
" 'database-name' = 'database'" +
" 'table-name' = 'clickhouse_sink_table'" +
" 'sink.batch-size' = '500'" +
" 'sink.flush-interval' = '1000'" +
" 'sink.max-retries' = '3'" +

(2) Relevant Flink Java SQL code:
String sql =
"INSERT INTO clickhouse_sink_table SELECT vrfi.*, DATE_FORMAT(vrfi.created, 'yyyy-MM') FROM mysql_source_table as vrfi\n";
statementSet.addInsertSql(sql);

  1. The table in MySQL has '539905' rows:
    select count(id) from sbtest1;
    +-----------+
    | count(id) |
    +-----------+
    | 539905 |
    +-----------+
    1 row in set (0.05 sec)

  2. Rows for ClickHouse sink table:
    _SELECT COUNT(id) FROM clickhouse_sink_table;

SELECT COUNT(id)
FROM clickhouse_sink_table

Query id: 775dd633-9a8b-4c13-b272-3b317f0575ab

┌─count(id)─┐
│ 556114 │
└───────────┘_

  1. But if use 'distinct' function the results as expected:
    _SELECT COUNT(distinct(id)) FROM clickhouse_sink_table;

SELECT COUNTDistinct(id)
FROM clickhouse_sink_table

Query id: 3e6dc971-47e0-4ca0-836b-eaabf280d2e2

┌─uniqExact(id)─┐
│ 539905 │
└───────────────┘_

  1. I found similar issue: https://stackoverflow.com/questions/53442559/how-to-avoid-duplicates-in-clickhouse-table. Does it help if I change engine to 'replicated merge tree' or 'replacing merge tree' ?

Thanks for your help.

Update operation exception

When I use sqlserver CDC as the source, it is normal to insert and delete records in the sqlserver table. When the update record is abnormal, I exit the job.

CREATE TABLE ch_user (
id int NOT NULL,
name varchar(50),
comment varchar(255),
create_time timestamp,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://xxxxx:8123',
'database-name' = 'default',
'table-name' = 'user',
'username' = 'default',
'password' = '123456',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);

2022-03-03 15:14:48
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:252)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:237)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:163)
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:446)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.RuntimeException: Flush exception found.
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72)
at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.lang.RuntimeException: Flush exception found.
... 11 more
Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76)
... 9 more
Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67)
at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125)
at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74)
... 9 more

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.