Giter Site home page Giter Site logo

dtstack / flinkstreamsql Goto Github PK

View Code? Open in Web Editor NEW
2.0K 117.0 923.0 6.91 MB

基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法

License: Apache License 2.0

Java 99.91% Shell 0.08% Scala 0.01%
flink sql stream bigdata

flinkstreamsql's Introduction

FlinkStreamSQL

License

技术交流

  • 招聘Flink开发工程师,如果有兴趣,请联系思枢【微信号ysqwhiletrue】,注明招聘
    Flink开发工程师JD要求:
    1.负责袋鼠云基于Flink的衍生框架数据同步flinkx和实时计算flinkstreamsql框架的开发;
    2.调研和把握当前最新大数据实时计算技术,将其中的合适技术引入到平台中,改善产品,提升竞争力;
    职位要求:
    1、本科及以上学历,3年及以上的Flink开发经验,精通Java,熟悉Scala、Python优先考虑;
    2、熟悉Flink原理,有基于Flink做过二次源码的开发,在github上贡献者Flink源码者优先;
    3、有机器学习、数据挖掘相关经验者优先;
    4、对新技术有快速学习和上手能力,对代码有一定的洁癖;
    加分项:
    1.在GitHub或其他平台上有过开源项目;
    可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至[email protected]
  • 我们使用钉钉沟通交流,可以搜索群号[30537511]或者扫描下面的二维码进入钉钉群

介绍

  • 基于开源的flink,对其实时sql进行扩展
    • 自定义create table 语法(包括源表,输出表,维表)
    • 自定义create view 语法
    • 自定义create function 语法
    • 实现了流与维表的join
    • 支持原生FlinkSQL所有的语法
    • 扩展了输入和输出的性能指标到Task metrics

目录

1.1 demo
1.2 快速开始
1.3 参数配置
1.4 支持的插件介绍和demo
1.5 指标参数
1.6 自定义函数
1.7 自定义视图

如何贡献FlinkStreamSQL

pr规范

License

FlinkStreamSQL is under the Apache 2.0 license. See the LICENSE file for details.

flinkstreamsql's People

Contributors

chncaption avatar dengdejiushini avatar dependabot[bot] avatar flechazow avatar harbby avatar hilany avatar hongtao12310 avatar jiemotongxue avatar kaidiguo-mr avatar kanata163 avatar kyo-tom avatar lijiangbo avatar simenliuxing avatar todd5167 avatar xiuzhu9527 avatar xuqianjin-stars avatar yangsishu avatar zhihui-ge avatar zhu4680 avatar zoudaokoulife avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flinkstreamsql's Issues

是否支持多流式源表的union all 操作?

测试输出代码:
insert
into
TargTable02

select c.height , c.name from
(
	(
	select 
		a.height,
		a.name
	from
		StreamSrcTable a
	where a.name = 'Tom'
	)
	union all 
	(
	select 
		b.height,
		b.name
	from
		StreamSrcTable02 b
	where b.name ='Jery'
	)
) c
;

经过测试,当被union的2个表为同一个表时,输出正常。当union的2个表为2个不同的源表时,会报错:
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
... 11 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'STREAMSRCTABLE' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

执行sh命令提交任务失败

执行提交命令 sh submit.sh -sql /home/lvtao/Desktop/sideSql.txt -name xctest -localSqlPluginPath /home/lvtao/Desktop/flinkStreamSQL/plugins -mode local -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}

出现如下错误:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
---exeSql---
INSERT INTO MYRESULT
(SELECT D.CHANNEL, D.INFO
FROM (SELECT A.*, B.INFO
FROM MYTABLE AS A
INNER JOIN SIDETABLE AS B ON A.CHANNEL = B.NAME
WHERE A.CHANNEL = 'xc2' AND A.PV = 10) AS D)

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1731457055] with leader session id 93f0d7c4-ebe1-404e-94b9-5e61290438ff.
Exception in thread "main" org.apache.flink.runtime.client.JobSubmissionException: Could not retrieve BlobServer address.
at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:166)
at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:160)

多次提交任务时报KafkaSourceTableInfo类型转换错误

我在原先项目中加了netty,把服务包装成可以rest请求提交的形式,每次被请求时就调Launcher里的LauncherMain.main方法提交任务。但是首次提交任务时成功,再次提交后就报如下错误:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at com.dtstack.flink.sql.launcher.LauncherMain.submit(LauncherMain.java:91)
at com.dtstack.flink.sql.launcher.httpserver.NettyHttpServer$HttpHandler$1.run(NettyHttpServer.java:228)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo cannot be cast to com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo
at com.dtstack.flink.sql.source.kafka.KafkaSource.genStreamSource(KafkaSource.java:56)
at com.dtstack.flink.sql.source.kafka.KafkaSource.genStreamSource(KafkaSource.java:44)
at com.dtstack.flink.sql.source.StreamSourceFactory.getStreamSource(StreamSourceFactory.java:90)
at com.dtstack.flink.sql.Main.registerTable(Main.java:247)
at com.dtstack.flink.sql.Main.main(Main.java:165)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
... 5 more

can not compile with flink 1.6

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project sql.core: Compilation failure
[ERROR] /Users/xianhua.wei/Downloads/flinkStreamSQL/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java:[103,22] no suitable method found for setLong(org.apache.flink.configuration.ConfigOption<java.lang.String>,long)
[ERROR] method org.apache.flink.configuration.Configuration.setLong(java.lang.String,long) is not applicable
[ERROR] (argument mismatch; org.apache.flink.configuration.ConfigOption<java.lang.String> cannot be converted to java.lang.String)
[ERROR] method org.apache.flink.configuration.Configuration.setLong(org.apache.flink.configuration.ConfigOption<java.lang.Long>,long) is not applicable
[ERROR] (argument mismatch; org.apache.flink.configuration.ConfigOption<java.lang.String> cannot be converted to org.apache.flink.configuration.ConfigOption<java.lang.Long>)
[ERROR]
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn -rf :sql.core

提交至配置了HA的standalone模式flink报未描述HDFS路径错误

提交命令为:
sh ./submit.sh -sql /home/jinzhiliang/test/flink/sqlTXT/sideSql.txt -name xctest -remoteSqlPluginPath /home/dl/projects/flink-1.4.2/plugins -localSqlPluginPath /home/dl/projects/flink-1.4.2/plugins -mode standalone -flinkconf /home/dl/projects/flink-1.4.2/conf -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}

集群已standalone模式部署,并配置了HA。单独flink提交作业时正常,但是使用finkstreamSQL提交作业时报错:
Exception in thread "main" java.lang.RuntimeException: Couldn't retrieve standalone cluster
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:70)
at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:59)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:60)
Caused by: java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:103)
at org.apache.flink.client.program.ClusterClient.(ClusterClient.java:144)
at org.apache.flink.client.program.StandaloneClusterClient.(StandaloneClusterClient.java:44)
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:47)
... 3 more
Caused by: java.io.IOException: The given file system URI (hdfs:///flink/ha) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') contains no valid authority component (like hdfs namenode, S3 host, etc)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116)

请问,是否支持standalone模式的HA集群。

与Flink SQL-Client集成

有计划和SQL-Client集成吗?

或者说怎么给用户一个统一的SQL入口 如果用户想stream join stream 和 stream join 维表 ?

请问,flinkStreamSQL 1.6.0_release Jobmanager 连接超时,有遇到过吗?怎么解决?

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms (JobID: d4cfc96ea9bf00c66083221bb2a3a049)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:556)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:117)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:432)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:86)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:438)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:547)
... 5 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:436)
... 6 more

Kafka0.11 join mysql Null Pointer error

based on flink1.4.0

the submit command
./submit.sh -sql ../mysqlsidekafka -name mysqljoinkafka -remoteSqlPluginPath /opt/flinkStreamSQL/plugins/ -localSqlPluginPath /opt/flinkStreamSQL/plugins/ -mode standalone -flinkconf /opt/flink-1.4.0/conf -confProp {"time.characteristic":"ProcessingTime","sql.env.parallelism":1}

the log in nohup

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
---exeSql---
INSERT INTO ADJOINTRAIN
(SELECT A.AID, A.UID, B.PRODUCTID
FROM ADFEATURE AS A
INNER JOIN TRAIN AS B ON A.AID = B.AID)
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:68)
Caused by: java.lang.NullPointerException
at org.apache.commons.collections.CollectionUtils.isEqualCollection(CollectionUtils.java:305)
at com.dtstack.flink.sql.side.SideSqlExec.checkJoinCondition(SideSqlExec.java:525)
at com.dtstack.flink.sql.side.SideSqlExec.exec(SideSqlExec.java:131)
at com.dtstack.flink.sql.Main.main(Main.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
... 3 more

创建mysql维表的时候报错

Exception in thread "main" java.lang.ClassNotFoundException: com.dtstack.flink.sql.source.mysql.table.MysqlSourceParser 这个包是没有上传么?我在源码里面没有看到。

建表时支持json嵌套格式吗??

假如建立表,接入kafka数据,格式为json,类型是jsonobject,但其中又嵌套了jsonarray,这种复合型格式json,怎么用建表sql描述呢,求教各位老师

提交任务后报Could not retrieve the leader gateway.

你好,我已经用standalone模式部署好了flink(没有用hadoop),并且start-cluster.sh启动了集群,也用nc测试了官方简单流计算任务可以执行的。但用你的项目提交作业(参数入下 sh ./submit.sh -sql /home/javajzl/testFile/flink/sqlTXT/sideSql.txt -name xctest -remoteSqlPluginPath /opt/modules/flink-1.5.2/plugins -localSqlPluginPath /opt/modules/flink-1.5.2/plugins -mode standalone -flinkconf /opt/modules/flink-1.5.2/conf -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}
),会报找不到leader gateway错误:
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:511)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:68)
Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running.
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:862)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:509)
... 5 more
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway.
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:857)
... 6 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77)
... 7 more
我检查的主节点进程StandaloneSessionClusterEntrypoint 是存在的,为啥找不到gateway呢?

flink集群提交job报错Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

System.err: ---exeSql---
INSERT INTO tb_target
(SELECT c.id, c.username, d.deptname, d.description, c.posname, c.money, c.__event_type, c.__timestamp
FROM (SELECT b.id AS id, b.deptid, b.username, a.posname, a.money, a.__timestamp, a.__event_type
FROM tb_pos_curr AS a
INNER JOIN tb_emp AS b ON b.posid = a.id) AS c
INNER JOIN tb_dept AS d ON d.id = c.deptid)
---------side_exe_sql-----
select id,deptid,posid,username from tb_emp where posid=?

---------side_exe_sql-----
select id,deptname,description from tb_dept where id=?

---submit end----

System.out: (none)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

flink提交任务不能返回jobid,但是能够成功运行。如果需要提供别的什么资料请回复。十分感谢。

流表和维度表逻辑问题,能否处理一对多的情况

flinkstreamsql 主表和维度表的逻辑,能否处理一对多的情况。就是进来的数据是一条记录,符合的维度表是多条记录的情况,比如坚定到部门表添加了一个部门,连接用户表(维度表) 符合记录的有10个人的逻辑

任务提交到flink1.5.4的on yarn 模式时报“Haven't been developed yet!”

请问是否支持flink1.5.4版本的on yarn模式呢?
使用提交命令如下:
sh ./submit.sh -sql /home/jinzhiliang/test/flink/sqlTXT/sideSql.txt -name xctest -remoteSqlPluginPath /home/dl/projects/flink-1.5.4/plugins -localSqlPluginPath /home/dl/projects/flink-1.5.4/plugins -mode standalone -flinkconf /home/dl/projects/flink-1.5.4/conf -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}
nohup.out日志:
Exception in thread "main" java.lang.UnsupportedOperationException: Haven't been developed yet!
at com.dtstack.flink.sql.launcher.ClusterClientFactory.createYarnClient(ClusterClientFactory.java:158)
at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:61)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:60
日志显示还未开发!

1.6版本本地模式指定-localSqlPluginPath找不到具体执行jar的问题

新人刚接触这套开源系统,请问下,我在本地执行时,参数-localSqlPluginPath传入了打包后的本地的插件目录 E:\ideaworkspace\flinkStreamSQL-1.6.0\plugins

image

我源头表用的是kafka010
当执行到PluginUtil.java的getJarFileDirPath方法时,拿到的File jarFile仅仅是一个目录:E:\ideaworkspace\flinkStreamSQL-1.6.0\plugins\kafka010source
并没有指向这里面具体的jar文件,所以报错了。
java.lang.RuntimeException: path E:\ideaworkspace\flinkStreamSQL-1.6.0\plugins\kafka010source not exists!!!
请问是我入参配置有问题么

提交任务至flink1.4.0的standalone模式时报classNotFound错误

提交任务命令是:
sh ./submit.sh -sql /home/javajzl/testFile/flink/sqlTXT/sideSql.txt -name xctest -remoteSqlPluginPath /opt/modules/flink-1.5.2/plugins -localSqlPluginPath /opt/modules/flink-1.5.2/plugins -mode standalone -flinkconf /opt/modules/flink-1.5.2/conf -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}

任务节点log错误代码如下:
2018-10-24 20:09:06,928 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source -> Map -> to: Row -> from: (NAME, HEIGHT, PROCTIME) -> select: (HEIGHT AS ID, NAME) -> to: Tuple2 -> Sink: Unnamed (1/1) (9b2516f64d0f5245888c0371b273a5b7) [DEPLOYING] 2018-10-24 20:09:06,931 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> Map -> to: Row -> from: (NAME, HEIGHT, PROCTIME) -> select: (HEIGHT AS ID, NAME) -> to: Tuple2 -> Sink: Unnamed (1/1) (9b2516f64d0f5245888c0371b273a5b7) [DEPLOYING]. 2018-10-24 20:09:06,935 INFO org.apache.flink.runtime.blob.BlobClient - Downloading 3c2cf29ec019b5fd78c9adc9f5453e4d/p-e1d178b977317e2485c78864825b487fa11da742-d3e1b655109b356b921b0c4d8157ef06 from /192.168.114.128:39276 2018-10-24 20:09:06,989 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source -> Map -> to: Row -> from: (NAME, HEIGHT, PROCTIME) -> select: (HEIGHT AS ID, NAME) -> to: Tuple2 -> Sink: Unnamed (1/1) (9b2516f64d0f5245888c0371b273a5b7) [DEPLOYING]. 2018-10-24 20:09:06,992 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> to: Row -> from: (NAME, HEIGHT, PROCTIME) -> select: (HEIGHT AS ID, NAME) -> to: Tuple2 -> Sink: Unnamed (1/1) (9b2516f64d0f5245888c0371b273a5b7) switched from DEPLOYING to RUNNING. 2018-10-24 20:09:07,004 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default state backend (Memory / JobManager) 2018-10-24 20:09:07,471 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> to: Row -> from: (NAME, HEIGHT, PROCTIME) -> select: (HEIGHT AS ID, NAME) -> to: Tuple2 -> Sink: Unnamed (1/1) (9b2516f64d0f5245888c0371b273a5b7) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate chained outputs. at org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:320) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:278) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.types.CRowSerializer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:115) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348)

请问,是否是由于提交任务时plugins插件包没有顺利加载进入项目导致的呢?

hbaseSink.md样例

CREATE TABLE MyResult( cf:channel STRING, cf:pv BIGINT )WITH( type ='hbase', zookeeperQuorum ='rdos1:2181', tableName ='workerinfo', rowKey ='cf:channel', parallelism ='1', zookeeperParent ='/hbase' )
hbaseSink.md样例中cf:channel STRING,STRING类型不支持,得改成varchar才可以

SideSqlExec类中有重复的操作

SideSqlExec.java
感觉这个地方重复了
SideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
if(sideTableInfo == null){
sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
}
有一个地方要改成下面这种
sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias());

提交任务到的flink on yarn 模式报错。

在YARN上启动一个Flink主要有两种方式:
(1)、启动一个YARN session(Start a long-running Flink cluster on YARN);
(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。下面将分别进行介绍。
请问flinkstreamsql是支持以上的哪种flink on yarn模式呢?
目前我启动flink on yarn 的方式是yarn session方式,启动命令为./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
启动后我在yarn的ui上看见了Flink session cluster已启动成功,测试用flink原生命令行提交任务没问题。
image
但是用flinkstreamsql提交时报找不到JobManager:
Exception in thread "main" java.lang.RuntimeException: Unable to get ClusterClient status from Application Client
at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:253)
at org.apache.flink.yarn.YarnClusterClient.waitForClusterToBeReady(YarnClusterClient.java:515)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:505)
at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:213)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:91)
Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running.
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:862)
at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:248)
... 7 more
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway.
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:857)
... 8 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)

1.6版本本地提交报错Could not read taskmanager.memory.size

本地环境:linux,flinkStreamSql-1.6分支
启动脚本
java -cp /data/flink/test/lib/* com.dtstack.flink.sql.launcher.LauncherMain -sql /data/flink/test/test.txt -name xctest -localSqlPluginPath /data/flink/test/plugins -mode local -confProp {\"sql.checkpoint.interval\":10000}
报错:
Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Could not read taskmanager.memory.size at org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration.fromConfiguration(TaskManagerServicesConfiguration.java:236) at org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:234) at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:389) at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:382) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:324) at com.dtstack.flink.sql.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:115) at com.dtstack.flink.sql.Main.main(Main.java:203) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:60) Caused by: java.lang.NumberFormatException: text does not start with a number at org.apache.flink.configuration.MemorySize.parseBytes(MemorySize.java:178) at org.apache.flink.configuration.MemorySize.parse(MemorySize.java:131) at org.apache.flink.configuration.MemorySize.parse(MemorySize.java:148) at org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration.fromConfiguration(TaskManagerServicesConfiguration.java:233)

关于redis的DDL语法与实际存储在redis中的数据结构的映射关系

如题。各位开发这个应用的大神,能否给一个详细一些的redis维表的例子,尤其是redis端存储的实际数据的结构。比如我
create table redisSideTable(
PRIMARY KEY(user_id),
user_name varchar,
user_id varchar,
user_sex varchar,
PERIOD FOR SYSTEM_TIME
)WITH(
type='redis',
url='xxxxxxxxxx:xxxx',
password='',
database='0',
tableName='sidetest',
cache = 'LRU',
cacheTTLMs='10000'
);

这样的用户表,假如有5000个用户信息,用什么redis数据结构来存储这5000条数据信息呢

寻求sqlParser相关的帮助

Hi 袋鼠云:
我是流计算项目 harbby/sylph 的作者。我在批流join, 以及sqlPaser方面碰到一些困难.
通过介绍发现你们解析做的非常棒,想多和你们进行一些交流和学习。

文档里有个错误

1.4.2 命令行参数选项

model

描述:执行模式,也就是flink集群的工作模式
local: 本地模式
standalone: 独立部署模式的flink集群
yarn: yarn模式的flink集群
必选:否
默认值:local

这里的“model”应该改为“mode“,表示运行模式。

linux命令提交任务,参数解析失败

以下为linux命令行提交任务(在命令行提交时,去掉了参数中的反斜杠转义):
sh ./submit.sh -sql /home/javajzl/testFile/flink/sqlTXT/sideSql.txt -name xctest -remoteSqlPluginPath /home/javajzl/testFile/flink/flinkStreamSQL-master/plugins -localSqlPluginPath /home/javajzl/testFile/flink/flinkStreamSQL-master/plugins -mode local -confProp {"time.characteristic":"EventTime","sql.checkpoint.interval":10000}

nohup日志中显示-confProp的参数解析错误(以上命令在测试类中提交任务,可以成功):
image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.