Giter Site home page Giter Site logo

dtstack / chunjun Goto Github PK

View Code? Open in Web Editor NEW
3.9K 170.0 1.7K 124.08 MB

A data integration framework

Home Page: https://dtstack.github.io/chunjun/

License: Apache License 2.0

Java 97.34% Shell 0.42% Dockerfile 0.25% FreeMarker 1.10% TypeScript 0.85% JavaScript 0.02% CSS 0.03%
flink bigdata data-integration framework java

chunjun's Introduction

ChunJun

npm version license npm downloads master coverage

EN doc CN doc

Introduce

ChunJun is a distributed integration framework, and currently is based on Apache Flink. It was initially known as FlinkX and renamed ChunJun on February 22, 2022. It can realize data synchronization and calculation between various heterogeneous data sources. ChunJun has been deployed and running stably in thousands of companies so far.

Official website of ChunJun: https://dtstack.github.io/chunjun/

Features of ChunJun

ChunJun abstracts different databases into reader/source plugins, writer/sink plugins and lookup plugins, and it has the following features:

  • Based on the real-time computing engine--Flink, and supports JSON template and SQL script configuration tasks. The SQL script is compatible with Flink SQL syntax;
  • Supports distributed operation, support flink-standalone, yarn-session, yarn-per job and other submission methods;
  • Supports Docker one-click deployment, support deploy and run on k8s;
  • Supports a variety of heterogeneous data sources, and supports synchronization and calculation of more than 20 data sources such as MySQL, Oracle, SQLServer, Hive, Kudu, etc.
  • Easy to expand, highly flexible, newly expanded data source plugins can integrate with existing data source plugins instantly, plugin developers do not need to care about the code logic of other plugins;
  • Not only supports full synchronization, but also supports incremental synchronization and interval training;
  • Not only supports offline synchronization and calculation, but also compatible with real-time scenarios;
  • Supports dirty data storage, and provide indicator monitoring, etc.;
  • Cooperate with the flink checkpoint mechanism to achieve breakpoint resuming, task disaster recovery;
  • Not only supports synchronizing DML data, but also supports DDL synchronization, like 'CREATE TABLE', 'ALTER COLUMN', etc.;

Build And Compilation

Get the code

Use the git to clone the code of ChunJun

git clone https://github.com/DTStack/chunjun.git

build

Execute the command in the project directory.

./mvnw clean package

Or execute

sh build/build.sh

Common problem

Compiling module 'ChunJun-core' then throws 'Failed to read artifact descriptor for com.google.errorprone:javac-shaded'

Error message:

[ERROR]Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check(spotless-check)on project chunjun-core:
        Execution spotless-check of goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check failed:Unable to resolve dependencies:
        Failed to collect dependencies at com.google.googlejavaformat:google-java-format:jar:1.7->com.google.errorprone:javac-shaded:jar:9+181-r4173-1:
        Failed to read artifact descriptor for com.google.errorprone:javac-shaded:jar:9+181-r4173-1:Could not transfer artifact
        com.google.errorprone:javac-shaded:pom:9+181-r4173-1 from/to aliyunmaven(https://maven.aliyun.com/repository/public): 
        Access denied to:https://maven.aliyun.com/repository/public/com/google/errorprone/javac-shaded/9+181-r4173-1/javac-shaded-9+181-r4173-1.pom -> [Help 1]

Solution: Download the 'javac-shaded-9+181-r4173-1.jar' from url 'https://repo1.maven.org/maven2/com/google/errorprone/javac-shaded/9+181-r4173-1/javac-shaded-9+181-r4173-1.jar', and then install locally by using command below:

mvn install:install-file -DgroupId=com.google.errorprone -DartifactId=javac-shaded -Dversion=9+181-r4173-1 -Dpackaging=jar -Dfile=./jars/javac-shaded-9+181-r4173-1.jar

Quick Start

The following table shows the correspondence between the branches of ChunJun and the version of flink. If the versions are not aligned, problems such as 'Serialization Exceptions', 'NoSuchMethod Exception', etc. mysql occur in tasks.

Branches Flink version
master 1.16.1
1.12_release 1.12.7
1.10_release 1.10.1
1.8_release 1.8.3

ChunJun supports running tasks in multiple modes. Different modes depend on different environments and steps. The following are

Local

Local mode does not depend on the Flink environment and Hadoop environment, and starts a JVM process in the local environment to perform tasks.

Steps

Go to the directory of 'chunjun-dist' and execute the command below:

sh bin/chunjun-local.sh  -job $SCRIPT_PATH

The parameter of "$SCRIPT_PATH" means 'the path where the task script is located'. After execute, you can perform a task locally.

note:

when you package in windows and run sh in linux , you need to execute command  sed -i "s/\r//g" bin/*.sh to fix the '\r' problems.

Reference video

Standalone

Standalone mode depend on the Flink Standalone environment and does not depend on the Hadoop environment.

Steps

1. add jars of chunjun
  1. Find directory of jars: if you build this project using maven, the directory name is 'chunjun-dist' ; if you download tar.gz file from release page, after decompression, the directory name would be like 'chunjun-assembly-${revision}-chunjun-dist'.

  2. Copy jars to directory of Flink lib, command example:

cp -r chunjun-dist $FLINK_HOME/lib

Notice: this operation should be executed in all machines of Flink cluster, otherwise some jobs will fail because of ClassNotFoundException.

2. Start Flink Standalone Cluster
sh $FLINK_HOME/bin/start-cluster.sh

After the startup is successful, the default port of Flink Web is 8081, which you can configure in the file of 'flink-conf.yaml'. We can access the 8081 port of the current machine to enter the flink web of standalone cluster.

3. Submit task

Go to the directory of 'chunjun-dist' and execute the command below:

sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

After the command execute successfully, you can observe the task staus on the flink web.

Reference video

Yarn Session

YarnSession mode depends on the Flink jars and Hadoop environments, and the yarn-session needs to be started before the task is submitted.

Steps

1. Start yarn-session environment

Yarn-session mode depend on Flink and Hadoop environment. You need to set $HADOOP_HOME and $FLINK_HOME in advance, and we need to upload 'chunjun-dist' with yarn-session '-t' parameter.

cd $FLINK_HOME/bin
./yarn-session -t $CHUNJUN_HOME -d
2. Submit task

Get the application id $SESSION_APPLICATION_ID corresponding to the yarn-session through yarn web, then enter the directory 'chunjun-dist' and execute the command below:

sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}

'yarn.application.id' can also be set in 'flink-conf.yaml'. After the submission is successful, the task status can be observed on the yarn web.

Reference video

Yarn Per-Job

Yarn Per-Job mode depend on Flink and Hadoop environment. You need to set $HADOOP_HOME and $FLINK_HOME in advance.

Steps

The yarn per-job task can be submitted after the configuration is correct. Then enter the directory 'chunjun-dist' and execute the command below:

sh ./bin/chunjun-yarn-perjob.sh -job chunjun-examples/json/stream/stream.json

After the submission is successful, the task status can be observed on the yarn web.

Docs of Connectors

For details, please visit:https://dtstack.github.io/chunjun/documents/

Contributors

Thanks to all contributors! We are very happy that you can contribute Chunjun.

contributors

Contributor Over Time

Stargazers Over Time

License

ChunJun is under the Apache 2.0 license. Please visit LICENSE for details.

Contact Us

Join ChunJun Slack. https://join.slack.com/t/chunjun/shared_invite/zt-1hzmvh0o3-qZ726NXmhClmLFRMpEDHYw

chunjun's People

Contributors

a49a avatar chaozwn avatar chestnutqiang avatar conghe2402 avatar david-gao1 avatar demotto avatar flechazow avatar hilany avatar jiemotongxue avatar kanata163 avatar kinoxyz1 avatar kyo-tom avatar libailin avatar lijiangbo avatar liumengkai avatar ll076110 avatar lvyanquan avatar meng1222 avatar mggger avatar paddy0523 avatar qscgu23 avatar simenliuxing avatar superlemonjump avatar x438949560x avatar xiuzhu9527 avatar yanghuaigit avatar yangsishu avatar zhengchaoken avatar zlj-baiyu avatar zoudaokoulife avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

chunjun's Issues

Nice job

借用了好几个sink和mapFunc, 写的非常好, 赞一个

standalone提交任务运行报错Failed to retrieve JobManager address,local提交正常跑

flink集群跑自带的测试样例能正常跑通。

命令:bin/flinkx -mode standalone -job /tmp/zyl/flink-data-transfer/jobs/mysql_to_mysql.json -plugin /tmp/zyl/flink-data-transfer/plugins -flinkconf /opt/flink-1.6.1/conf

standalone提交任务运行报错:
14:08:18.828 [main] INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/tmp/zyl/flink-data-transfer
14:08:18.829 [main] INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=bd129106:2181,bd129107:2181,bd129108:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@4c60d6e9
14:08:18.831 [main] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - zookeeper.disableAutoWatchReset is false
14:08:18.843 [main-SendThread(bd129108:2181)] INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server bd129108/192.168.129.108:2181. Will not attempt to authenticate using SASL (unknown error)
14:08:18.844 [main-SendThread(bd129108:2181)] INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to bd129108/192.168.129.108:2181, initiating session
14:08:18.847 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment request sent on bd129108/192.168.129.108:2181
14:08:18.883 [main-SendThread(bd129108:2181)] INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server bd129108/192.168.129.108:2181, sessionid = 0x166282611458a6b, negotiated timeout = 60000
14:08:18.894 [main-EventThread] INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
14:08:19.077 [main] INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService.
14:08:19.103 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,691490210287,0 request:: '/flinkx,F response:: s{691489993987,691489993987,1539159387219,1539159387219,0,1,0,0,0,1,691489993988}
14:08:19.105 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 2,3 replyHeader:: 2,691490210287,0 request:: '/flinkx/default,F response:: s{691489993988,691489993988,1539159387246,1539159387246,0,8,0,0,0,4,691490163480}
14:08:19.112 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 3,3 replyHeader:: 3,691490210287,0 request:: '/flinkx,F response:: s{691489993987,691489993987,1539159387219,1539159387219,0,1,0,0,0,1,691489993988}
14:08:19.114 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 4,3 replyHeader:: 4,691490210287,0 request:: '/flinkx/default,F response:: s{691489993988,691489993988,1539159387246,1539159387246,0,8,0,0,0,4,691490163480}
14:08:19.116 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 5,3 replyHeader:: 5,691490210287,0 request:: '/flinkx/default/leader,F response:: s{691489993989,691489993989,1539159387272,1539159387272,0,81012,0,0,0,4,691490163489}
14:08:19.118 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:null serverPath:null finished:false header:: 6,3 replyHeader:: 6,691490210287,0 request:: '/flinkx/default/leader/00000000000000000000000000000000,F response:: s{691490098615,691490098615,1539421562506,1539421562506,0,0,0,0,0,0,691490098615}
14:08:19.120 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x166282611458a6b, packet:: clientPath:/flinkx/default/leader/00000000000000000000000000000000/job_manager_lock serverPath:/flinkx/default/leader/00000000000000000000000000000000/job_manager_lock finished:false header:: 7,3 replyHeader:: 7,691490210287,-101 request:: '/flinkx/default/leader/00000000000000000000000000000000/job_manager_lock,T response::
14:08:39.140 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x166282611458a6b after 0ms
14:08:59.158 [main-SendThread(bd129108:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x166282611458a6b after 0ms
14:09:19.121 [main] INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService.
Exception in thread "main" java.lang.RuntimeException: Failed to retrieve JobManager address
at org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterClient.java:308)
at org.apache.flink.client.program.StandaloneClusterClient.getWebInterfaceURL(StandaloneClusterClient.java:56)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:92)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader address and leader session ID.
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:113)
at org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterClient.java:302)
... 2 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:111)
... 3 more

哪位帮忙看下,谢谢

Flinkx是否支持中间数据转换?

比如说很多情况下,需求不仅是源数据库到目标数据库之间的简单复制,还有可能包括一些转换过程,常见的有格式转换、去重、聚合、连接等等,这些是否能够通过flinkx完成呢?

编译报错,ServerAddress找不到

[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[149,25] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[82,18] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[88,31] 无法访问com.mongodb.ServerAddress
[ERROR] 找不到com.mongodb.ServerAddress的类文件
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[90,17] 找不到符号
[ERROR] 符号: 类 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[90,46] 找不到符号
[ERROR] 符号: 变量 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[91,22] 找不到符号
[ERROR] 符号: 类 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[150,14] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[165,17] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[165,51] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[196,28] 找不到符号
[ERROR] 符号: 变量 WriteConcern
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
查看了下 mongodb-driver这个 jar 找不到对应的类,mongodb-java-driver 是有这个类的

HdfsWriter

你好,想请教下为什么hdfswriter写数据要通过hive,而不是直接写的hdfs文件呢?

yarn模式不能正常运行,报ClassNotFoundException

local模式运行没有问题,yarn模式不能正常运行
15:43:13.626 [main] INFO org.apache.flink.yarn.YarnClusterClient - Starting program in interactive mode
15:43:13.667 [main] INFO com.dtstack.flinkx.loader.DTClassLoader - urls=[file:/data/tmp/xxxxx/flinkx/plugins/common/flinkx-rdb.jar, file:/data/tmp/xxxxx/flinkx/plugins/mysqlreader/flinkx-mysql-reader.jar]
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:101)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.dtstack.flinkx.mysql.reader.MysqlReader
at com.dtstack.flinkx.plugin.PluginLoader.getPluginClass(PluginLoader.java:104)
at com.dtstack.flinkx.reader.DataReaderFactory.getDataReader(DataReaderFactory.java:44)
at com.dtstack.flinkx.Main.main(Main.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
... 3 more
Caused by: java.lang.ClassNotFoundException: com.dtstack.flinkx.mysql.reader.MysqlReader
at com.dtstack.flinkx.loader.DTClassLoader.loadClass(DTClassLoader.java:100)
at com.dtstack.flinkx.loader.DTClassLoader.loadClass(DTClassLoader.java:65)
at com.dtstack.flinkx.plugin.PluginLoader.getPluginClass(PluginLoader.java:102)
... 10 more
15:43:13.705 [Thread-4] INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook
15:43:13.706 [Thread-4] INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster

oracle_to_oracle .

Caused by: Error : 904, Position : 19, Sql = SELECT "id","name","telphone" FROM SB1, OriginalSql = SELECT "id","name","telphone" FROM SB1, Error Msg = ORA-00904: "telphone": invalid identifier
组合查询sql的时候,转义完关键字带引号,导致关键字不能被oracle识别

同步的时候遇到这gc时间太短

java.io.IOException: Couldn't read data - GC life time is shorter than transaction duration, transaction starts at 2019-02-18 16:10:52.016 +0800 CST, GC safe point is 2019-02-18 16:29:41.316 +0800 CST

这个配置是否可以修改?

ps:flinkx 同步mysql数据的时候 每次从mysql拽多少行数据?

sync mysql to mysql occurs null

同步mysql数据 到 mysql数据,
被同步的表 字段有约束:not null(实际是空字符串)
同步到目标的时候报错:字段不能null

mysql to hive 也会这样

请问JDBC 并发读的原理是什么?

JDBC 读数据 通过 JdbcInputFormatBuilder.setParameterValues(Object[][] parameterValues)来配置并发读,就是原理不是很明白。只能看出sql 语句中 通过 where mod(splitKey, 并发数 N) = [0,1,2...N-1] 可以把结果集分成N份,其它的实现就不知道了。
哪位大佬有空,请帮忙理理实现思路。万分感谢!

yarn模式,1.5分支

10月24号拉了最新1.5代码,flink集群是1.5版本,执行的时候报错,无法提交job到集群,目前报2个错误

1、RestClusterClient - Could not retrieve the web interface URL for the cluster错误?
17:20:36.510 [main] WARN org.apache.flink.client.program.rest.RestClusterClient - Could not retrieve the web interface URL for the cluster.
java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
at org.apache.flink.client.program.rest.RestClusterClient.getWebInterfaceURL(RestClusterClient.java:630)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:88)
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


2、无法提交job
17:20:46.550 [main-SendThread(bd129107:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x366999882f814c3 after 15ms
17:21:06.540 [main-SendThread(bd129107:2181)] DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x366999882f814c3 after 5ms
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not submit job 27c8db013acce632655e8da37eb75a3f.
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:248)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:97)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)
at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$19/291847739.apply(Unknown Source)
at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at java.util.concurrent.CompletableFuture$ThenCombine.run(CompletableFuture.java:882)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:205)
... 14 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$12/288797801.accept(Unknown Source)
at java.util.concurrent.CompletableFuture.doWhenComplete(CompletableFuture.java:2048)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2563)
at org.apache.flink.runtime.concurrent.FutureUtils.retryOperationWithDelay(FutureUtils.java:196)
at org.apache.flink.runtime.concurrent.FutureUtils.retryWithDelay(FutureUtils.java:151)
at org.apache.flink.client.program.rest.RestClusterClient.retry(RestClusterClient.java:694)
at org.apache.flink.client.program.rest.RestClusterClient.sendRetriableRequest(RestClusterClient.java:682)
at org.apache.flink.client.program.rest.RestClusterClient.sendRequest(RestClusterClient.java:676)
at org.apache.flink.client.program.rest.RestClusterClient.sendRequest(RestClusterClient.java:671)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:317)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:242)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:97)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException

oracle jar包版本问题。现在此com.github.noraui:ojdbc8:jar:12.2.0.1 已经找不到了

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal on project flinkx-oracle-core: Could not resolve dependencies for project com.dtstack.flinkx:flinkx-oracle-core:jar:1.6: Could not find artifact com.oracle.jdbc:ojdbc8:jar:12.2.0.1 in central (https://repo.maven.apache.org/maven2) -> [Help 1]

[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn -rf :flinkx-oracle-core

on yarn 模式 channel 设置超过5 就会报错

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: ch.qos.logback.classic.Logger
ClassLoader info: URL ClassLoader:
file: '/data3/yarn/nm/usercache/root/appcache/application_1544768545362_0843/blobStore-8adb0e17-dc6d-4483-9bf3-c9e17218fff4/job_0be51bf30c1876bac50876dcda78602d/blob_p-09ff3108ffc810d694ec5cf7e4370b8ea3b81890-68d3a95ce8b51258378b6a941c71e486' (valid JAR)
file: '/data/software/flinkx/plugins/mysqlreader/flinkx-mysql-reader.jar' (valid JAR)
file: '/data/software/flinkx/plugins/hdfswriter/flinkx-hdfs-writer.jar' (valid JAR)
file: '/data/software/flinkx/plugins/common/flinkx-rdb.jar' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ch.qos.logback.classic.Logger
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1164)
at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1398)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:682)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ch.qos.logback.classic.Logger

"job": { 3 "setting": { 4 "speed": { 5 "channel": xxx 6 },
channel 一多就会有这个错误
如果设置小 比如 4就能正常, 设置成5的时候 偶尔正常执行

Fault tolerance support

Does this project support fault tolerance?
Suppose an ETL job is running, if some nodes corrupted, the data transfer should be guaranteed without any data loss or data duplication, that's some kinds of transaction. Using Flink as a building block for ETL is a good idea, however, providing such features are non-trival, does Flinkx support this?

写入HDFS连接超时

14:52:51.595 [flink-akka.actor.default-dispatcher-9] DEBUG akka.event.EventStream - shutting down: StandardOutLogger started
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Call From hadoop1.test.yunwei.puppet.dh/192.168.112.47 to DHTestCluster:8020 failed on connection exception: java.net.ConnectException: Connection timed out; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy13.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy13.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at com.dtstack.flinkx.hdfs.writer.HdfsOutputFormat.openInternal(HdfsOutputFormat.java:139)
at com.dtstack.flinkx.outputformat.RichOutputFormat.open(RichOutputFormat.java:189)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:61)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
at org.apache.hadoop.ipc.Client.call(Client.java:1381)
... 27 more

hdfswriter

hdfswriter能否指定文件名写入,现在的参数filename不能实现这个作用,

读取mysql单表1000万条数据失败

最近在实用flinkx(flink1.4.0,slot内存大小1G,每个taskmanager配置两个slot,三台服务器组成的集群),发现在抽取mysql数据(单表1000万条,单条记录500字节左右)出现任务卡死的现象,JdbcInputFormat.java openInternal方法中 resultSet = statement.executeQuery();执行后无法返回,我按照JdbcInputFormat.java 中jdbc读取数据的方式单独写个java测试用例,jvm参数 -Xms1024m -Xmx1024m,出现oom异常,将jvm参数调大至2G即可正常运行,我怀疑是FetchSize设置没有生效,将openInternal方法中statement.setFetchSize(databaseInterface.getFetchSize());改为statement.setFetchSize(Integer.MIN_VALUE);,在1G内存下即可正常运行。
同时需要将获取descColumnTypeList的代码提前到statement.executeQuery();之前执行,否则会抛出如下异常
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@6438a396 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

附修改后的JdbcInputFormat.java 文件

/*

  • Licensed to the Apache Software Foundation (ASF) under one
  • or more contributor license agreements. See the NOTICE file
  • distributed with this work for additional information
  • regarding copyright ownership. The ASF licenses this file
  • to you under the Apache License, Version 2.0 (the
  • "License"); you may not use this file except in compliance
  • with the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
    
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.
    */

package com.dtstack.flinkx.rdb.inputformat;

import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.util.ClassUtil;
import com.dtstack.flinkx.util.DateUtil;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.Counter;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.;
import java.sql.Date;
import java.util.
;

import com.dtstack.flinkx.inputformat.RichInputFormat;

/**

  • InputFormat for reading data from a database and generate Rows.

  • Company: www.dtstack.com

  • @author [email protected]
    */
    public class JdbcInputFormat extends RichInputFormat {

    protected static final long serialVersionUID = 1L;

    protected DatabaseInterface databaseInterface;

    protected String username;

    protected String password;

    protected String drivername;

    protected String dbURL;

    protected String queryTemplate;

    protected int resultSetType;

    protected int resultSetConcurrency;

    protected List descColumnTypeList;

    protected transient Connection dbConn;

    protected transient PreparedStatement statement;

    protected transient ResultSet resultSet;

    protected boolean hasNext;

    protected Object[][] parameterValues;

    protected int columnCount;

    protected String table;

    protected TypeConverterInterface typeConverter;

    protected List column;

    public JdbcInputFormat() {
    resultSetType = ResultSet.TYPE_FORWARD_ONLY;
    resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
    }

    @OverRide
    public void configure(Configuration configuration) {

    }

    @OverRide
    public void openInternal(InputSplit inputSplit) throws IOException {
    try {
    ClassUtil.forName(drivername, getClass().getClassLoader());
    dbConn = DBUtil.getConnection(dbURL, username, password);

         if(drivername.equalsIgnoreCase("org.postgresql.Driver")){
             dbConn.setAutoCommit(false);
         }
    
         statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
    
         //提前执行 |by lgm
         if(descColumnTypeList == null) {
             descColumnTypeList = DBUtil.analyzeTable(dbConn,databaseInterface,table,column);
         }
    
         if (inputSplit != null && parameterValues != null) {
             for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
                 Object param = parameterValues[inputSplit.getSplitNumber()][i];
                 DBUtil.setParameterValue(param,statement,i);
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
             }
         }
    
    
         //statement.setFetchSize(databaseInterface.getFetchSize());
           //重新设定FetchSize |by lgm
         if(drivername.equalsIgnoreCase("mysqlreader")) {
             statement.setFetchSize(Integer.MIN_VALUE);
         }
         else {
             statement.setFetchSize(databaseInterface.getFetchSize());
         }
         statement.setQueryTimeout(databaseInterface.getQueryTimeout());
    
    
    
         resultSet = statement.executeQuery();
         hasNext = resultSet.next();
         columnCount = resultSet.getMetaData().getColumnCount();
    
    
     } catch (SQLException se) {
         throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
     }
    
     LOG.info("JdbcInputFormat[" + jobName + "]open: end");
    

    }

    @OverRide
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    return cachedStatistics;
    }

    @OverRide
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    if (parameterValues == null) {
    return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
    }
    GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
    for (int i = 0; i < ret.length; i++) {
    ret[i] = new GenericInputSplit(i, ret.length);
    }
    return ret;
    }

    @OverRide
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    return new DefaultInputSplitAssigner(inputSplits);
    }

    @OverRide
    public boolean reachedEnd() throws IOException {
    return !hasNext;
    }

    @OverRide
    public Row nextRecordInternal(Row row) throws IOException {
    row = new Row(columnCount);
    try {
    if (!hasNext) {
    return null;
    }

         DBUtil.getRow(dbURL,row,descColumnTypeList,resultSet,typeConverter);
         //update hasNext after we've read the record
         hasNext = resultSet.next();
         return row;
     } catch (SQLException se) {
         throw new IOException("Couldn't read data - " + se.getMessage(), se);
     } catch (NullPointerException npe) {
         throw new IOException("Couldn't access resultSet", npe);
     }
    

    }

    @OverRide
    public void closeInternal() throws IOException {
    DBUtil.closeDBResources(resultSet,statement,dbConn);
    parameterValues = null;
    }

}

Carbondata writer 看现在用的1.5的版本, 会向下兼容carbondata1.3版本吗

新手carbondata, 不知道写carbondata文件结构是怎样的,下载下来测试异常:
java.lang.RuntimeException: java.lang.InterruptedException: org.apache.carbondata.core.datastore.exception.CarbonDataWriterException: org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
at com.dtstack.flinkx.carbondata.writer.CarbonOutputFormat.closeInternal(CarbonOutputFormat.java:261)
at com.dtstack.flinkx.outputformat.RichOutputFormat.close(RichOutputFormat.java:346)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:96)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:438)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: org.apache.carbondata.core.datastore.exception.CarbonDataWriterException: org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
at org.apache.carbondata.hadoop.api.CarbonTableOutputFormat$CarbonRecordWriter.close(CarbonTableOutputFormat.java:456)
at com.dtstack.flinkx.carbondata.writer.recordwriter.AbstractRecordWriter.closeRecordWriter(AbstractRecordWriter.java:108)
at com.dtstack.flinkx.carbondata.writer.recordwriter.AbstractRecordWriter.close(AbstractRecordWriter.java:174)
at com.dtstack.flinkx.carbondata.writer.CarbonOutputFormat.closeInternal(CarbonOutputFormat.java:259)
... 8 more

local模式启动同时启动多个任务,出现端口占用的情况

11:00:05.882 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: unpooled
11:00:05.883 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
11:00:05.886 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - Loopback interface: lo (lo, 127.0.0.1)
11:00:05.887 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128
11:00:05.907 [main] DEBUG org.apache.flink.queryablestate.server.KvStateServerImpl - Failed to start Queryable State Server on port 9067: Address already in use.
11:00:05.907 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - Shutting down Queryable State Server @ null
11:00:05.908 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - Unable to start Queryable State Server. All ports in provided range are occupied.
11:00:05.908 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - Shutting down Queryable State Server @ null
11:00:05.908 [main] INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Stopping FlinkMiniCluster.

各位运行有啥问题吗,我这错误解决不了啊,实在不知道哪出问题了

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not submit job 58e0b47031e88ada6c51bc5f36b9b717.
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:247)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
at com.dtstack.flinkx.launcher.Launcher.main(Launcher.java:100)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
... 10 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:308)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:292)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more

编译报错,ServerAddress找不到

[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[149,25] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[82,18] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[88,31] 无法访问com.mongodb.ServerAddress
[ERROR] 找不到com.mongodb.ServerAddress的类文件
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[90,17] 找不到符号
[ERROR] 符号: 类 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[90,46] 找不到符号
[ERROR] 符号: 变量 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[91,22] 找不到符号
[ERROR] 符号: 类 MongoCredential
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[150,14] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[165,17] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[165,51] 找不到符号
[ERROR] 符号: 类 ServerAddress
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
[ERROR] /Users/kolar/OneDrive/github/flinkx/flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java:[196,28] 找不到符号
[ERROR] 符号: 变量 WriteConcern
[ERROR] 位置: 类 com.dtstack.flinkx.mongodb.MongodbUtil
查看了下 mongodb-driver这个 jar 找不到对应的类,mongodb-java-driver 是有这个类的

db2 jar包问题,现在此com.ibm.db2:db2jcc:jar:3.72.44找不到

[ERROR] Failed to execute goal on project flinkx-db2-core: Could not resolve dependencies for project com.dtstack.flinkx:flinkx-db2-core:jar:1.6: Could not find artifact com.ibm.db2:db2jcc:jar:3.72.44 in central (https://repo.maven.apache.org/maven2) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn -rf :flinkx-db2-core

数据写入不到HDFS

能留个钉钉或者微信什么的吗,方便交流
1.自定义了HttpSource,数据是可以读入的
2.数据写入不到HDFS里面去
======读入的数据=============75702552,DT35,1,230,2018-07-04 09:56:55,0,0,d9045b79b10df4f0307c158619d3c139,U093,0,0,0,953,3,0,10,0,0,12,71,1,0,0,0,0,0,0,0,0,2447740,0,0,2,7,23,0,0,0,0,1,0,0,0,0,sti9v7jz7OHQ3PbO0f4=,514,0,1,0,4,1,100,0,0,

18:59:30.562 [Source: httpreader (1/1)] INFO com.dtstack.flinkx.http.reader.HttpInputFormat - subtask input close finished
18:59:30.563 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.api.common.io.DefaultInputSplitAssigner - No more input splits available
18:59:30.563 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.jobmanager.JobManager - Send next input split null.
18:59:30.563 [Source: httpreader (1/1)] INFO com.dtstack.flinkx.http.reader.HttpInputFormat - subtask input close finished
18:59:30.565 [Source: httpreader (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished task Source: httpreader (1/1)
18:59:30.565 [Source: httpreader (1/1)] INFO com.dtstack.flinkx.http.reader.HttpInputFormat - subtask input close finished
18:59:30.566 [Source: httpreader (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Closed operators for task Source: httpreader (1/1)
18:59:30.577 [Source: httpreader (1/1)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Finished PipelinedSubpartition [number of buffers: 5 (59570 bytes), finished? true, read view? false].
18:59:30.577 [Source: httpreader (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: httpreader (1/1) (16ac1fb0b1019a13e7bdb5d5826b1ba9) switched from RUNNING to FINISHED.
18:59:30.577 [Source: httpreader (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: httpreader (1/1) (16ac1fb0b1019a13e7bdb5d5826b1ba9).
18:59:30.578 [Source: httpreader (1/1)] DEBUG org.apache.flink.runtime.io.network.NetworkEnvironment - Unregister task Source: httpreader (1/1) from network environment (state: FINISHED).
18:59:30.578 [Source: httpreader (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: httpreader (1/1) (16ac1fb0b1019a13e7bdb5d5826b1ba9) [FINISHED]
18:59:30.579 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task Source: httpreader (16ac1fb0b1019a13e7bdb5d5826b1ba9)
18:59:30.598 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: httpreader (1/1) (16ac1fb0b1019a13e7bdb5d5826b1ba9) switched from RUNNING to FINISHED.
18:59:30.598 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.instance.SlotSharingGroupAssignment - Release simple slot SimpleSlot (0)(0) - b66c222807be61af44639fa9dcdc8514 @ localhost (dataPort=-1) - RELEASED.
18:59:30.599 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - 07/04/2018 18:59:30 Source: httpreader(1/1) switched to FINISHED
07/04/2018 18:59:30 Source: httpreader(1/1) switched to FINISHED
18:59:34.416 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
18:59:39.416 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
18:59:44.416 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
18:59:46.586 [Sink: hdfswriter (1/1)] INFO com.dtstack.flinkx.hdfs.writer.HdfsTextOutputFormat - subtask[0] close()
18:59:49.416 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
18:59:54.416 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
18:59:59.416 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
19:00:04.416 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
19:00:09.415 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
19:00:14.415 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
19:00:19.415 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.