Giter Site home page Giter Site logo

byzer-org / byzer-lang Goto Github PK

View Code? Open in Web Editor NEW
1.8K 117.0 546.0 55.87 MB

Byzer (former MLSQL): A low-code open-source programming language for data pipeline, analytics and AI.

Home Page: https://www.byzer.org

License: Apache License 2.0

Scala 78.07% Java 11.72% HTML 0.04% Roff 0.07% Shell 2.09% ANTLR 0.12% Python 7.76% Dockerfile 0.03% Batchfile 0.10%
machine-learning mlsql bigdata sql-like-dsl

byzer-lang's Introduction

CI License

drawing

Byzer-lang

Byzer (former MLSQL) is a low-code, open-sourced and distributed programming language for data pipeline, analytics and AI in cloud native way.

Deisgn protocol: Everything is a table. Byzer is a SQL-like language, to simplify data pipeline, analytics and AI, combined with built-in algorithms and extensions.

We believe that everything is a table, a simple and powerful SQL-like language can significantly reduce human efforts of data development without switching different tools.

Byzer Architecture

Byzer-lang Arch

You can build a data product based on Byzer engine & Byzer-lang without interacting with computing framework directly like Spark in your data APP. Thus will simplify your data app significantly.

For example, Byzer org contributes a data app Byzer Notebook, which provides notebook interaction & workflow GUI interaction.

BIP (Byzer Improvement Proposal)

Byzer project uses the BIP for the community collaboration, you can checkout the feature design or architecture design in BIP.

Online Trial

You can access the official website https://www.byzer.org/ and try Byzer-lang & Byzer Notebook online.

Download

You can download Byzer engine via:

For more details, please refer to the docs

Install

For dev/test purpose, you can download Byzer All In One Package, extract and then execute the command below

$ cd {BYZER_HOME}
$ ./bin/byzer.sh start

And for production purpose, we recommend to use Byzer Server Package and deploy it on Hadoop.

You can also install Byzer VSCode Extension to use Byzer-lang.

For the Docker Image or , please refer to the docs

Byzer Code Example

Below list an example that how to process Github API as a table to get the information of Byzer Org

-- Get Github Organization Info

-- set API URL and params
SET org_name="byzer-org";
SET GITHUB_ORGANIZATION_URL="https://api.github.com/orgs/${org_name}";

-- Load Github Organization API as table
LOAD Rest.`$GITHUB_ORGANIZATION_URL` 
 where `config.connect-timeout`="10s"
 and `config.method`="GET"
 and `header.accept`="application/vnd.github.v3+json"
as github_org;


-- decode API response from binary to a json string
select string(content) as content from github_org as response_content;

-- expand the json string 
run response_content as JsonExpandExt.`` where inputCol="content" and structColumn="true" as github_org;

-- retrieve user infomation and process as a table
select content.* from github_org as org_info;

-- save the table to delta lake
save overwrite org_info as delta.`github_info_db.byzer_org`;

For more details about the Byzer-lang grammer, please refer to the user manaual Byzer-lang Grammar

Development

  1. Fork this repository and clone to local machine
git clone https://github.com/{YOUR_GITHUB}/byzer-lang.git
  1. Use Intellj Idea to open the project, choose the scala version 2.12.10

  2. In Intellj Idea Maven Setting, check the profile below

    • gpg
    • local
    • scala-2.12
    • spark-3.0.0
    • streamingpro-spark-3.0.0-adaptor
  3. Click Maven Refresh and wait for Idea load finished

  4. Find the class tech.mlsql.example.app.LocalSparkServiceApp, click Debug button then Byzer Engine will be started, then you can access the Byzer Web Console in http://localhost:9003/

Build

You can refer to the project byzer-org/byzer-build to check how to build the Byzer engine binary packages and images

How to contribute to Byzer-Lang

If you are planning to contribute to this project, please create an issue at our Issue page even if the topic is not related to source code itself (e.g., documentation, new idea and proposal).

This is an active open source project for everyone, and we are always open to people who want to use this system or contribute to it.

For more details about how to contribute to the Byzer Org, please refer to How to Contribute

Contributors

Made with contrib.rocks.

Community

byzer-lang's People

Contributors

2efper avatar admondguo avatar allwefantasy avatar anan0120 avatar bebee4java avatar bigalansun avatar cfmcgrady avatar chaozwn avatar chncaesar avatar ckeys avatar coderoverflow avatar dongbin86 avatar ghsuzzy avatar hellozepp avatar jovany-wang avatar kaligo-li avatar lifanfanalice avatar liuyonghengheng avatar lwz9103 avatar mapxn avatar rebiekong avatar scott-coding avatar slashwong avatar wangcheng15 avatar xubo245 avatar xuqianjin-stars avatar zhengshuaipeng avatar zhuohuwu0603 avatar zml1206 avatar zzcclp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

byzer-lang's Issues

多个 job ,依赖关系

a 中注册了个 函数 策略为 SparkStreamingRefStrategy

b 依赖a 策略为 SparkStreamingRefStrategy

c 依赖 b , 策略为 SparkStreamingStrategy

这样有问题么

pom中repo dns找不到

pom.xml的repo设置
cloudera的repo现在是 repositonry.cloudera.com
无法找到相应的DNS,不确定是我的dns问题还是typo
repository.cloudera.com此repo是能找到的

StreamingPro是否支持HDFS HA的方式

./bin/spark-submit --class streaming.core.StreamingApp
--master local[2]
--name sql-interactive
/streamingpro/streamingpro-spark-2.0-1.0.0.jar
-streaming.name sql-interactive
-streaming.job.file.path file:///streamingpro/test.json
-streaming.platform spark
-streaming.rest true
-streaming.driver.port 9004
-streaming.spark.service true
问题描述:
启动sqlserver方式如上,基于Scala 2.11 和spark 2.2.0 编译ServiceFramework和StreamingPro,按照README文档进行启动,报错如下:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: file:/streamingpro/query.json, expected: hdfs://haclusterdev
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at streaming.common.HDFSOperator$.readFile(HDFSOperator.scala:21)
at streaming.core.Dispatcher$.dispatcher(Dispatcher.scala:32)
at streaming.core.strategy.platform.PlatformManager.findDispatcher(PlatformManager.scala:34)
at streaming.core.strategy.platform.PlatformManager.run(PlatformManager.scala:85)
at streaming.core.StreamingApp$.main(StreamingApp.scala:14)
at streaming.core.StreamingApp.main(StreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
本人hdfs是采用HA的方式进行配置,看这个报错,是不是StreamingPro不支持HDFS HA的方式?

map reducer

工程是否允许map reduce的操作工作批处理呢。

kafka中读取json生成table

已经过测试

  • kafka输入数据:

{"people":{"type":"man","optional":"false"},"payload":["foo","bar"], "name": "hello"}

  • 输出两张表

test2
+----+-------------+-----+
|type| payload| name|
+----+-------------+-----+
| man|["foo","bar"]|hello|
+----+-------------+-----+

test3
+-----+-----+
| name| new|
+-----+-----+
|hello|"foo"|
|hello|"bar"|
+-----+-----+

  • 配置信息
{
    "scalamaptojson": {
        "desc": "测试",
        "strategy": "spark",
        "algorithm": [],
        "ref": [],
        "compositor": [{
                "name": "ss.sources",
                "params": [{
                    "format": "kafka",
                    "kafka.bootstrap.servers": "192.168.48.158:9092",
                    "subscribe": "spark-test",
                    "outputTable": "test",
                    "path": "-"
                }]
            },
            {
                "name": "ss.sql",
                "params": [{
                    "sql": "select CAST(value AS STRING) as json from test",
                    "outputTableName": "test1"
                }
                ]
            },
            {
                "name": "ss.sql",
                "params":[{
                    "sql": "select get_json_object(json, '$.people.type') as type, get_json_object(json, '$.payload') as payload, get_json_object(json, '$.name')  as name from test1",
                    "outputTableName": "test2" 
                }
                ]
            },
            {
                "name": "ss.sql",
                "params":[{
                    "sql": "select name, new from test2 lateral view explode(split(substr(test2.payload,2,length(test2.payload) - 2),',')) exploded as new", 
                    "outputTableName": "test3" 
                }
                ]
            },
            {
                "name": "ss.outputs",
                "params": [{
                    "mode": "append",
                    "format": "console",
                    "inputTableName": "test3",
                    "path": "-"
                },{
                    "mode": "append",
                    "format": "console",
                    "inputTableName": "test2",
                    "path": "-"
                }
                ]
            }
        ],
        "configParams": {}
    }
}

看了下项目的依赖关系.基本是无法自编译的

看了下项目的依赖关系.基本是无法自编译的

依赖active_orm , active_orm 依赖.两个csdn的内部jar包.

这两个内部jar包不开源.

建议两种方式.将jar包直接放到工程中.或者不用那两个jar..

建一个讨论群

能否建一个讨论群,大家一起讨论问题啊,刚接触这个,看着好费劲

深入浅出StreamingPro

原文:http://zqhxuyuan.github.io/2017/09/04/2017-09-04-StreamingPro/

StreamingPro支持Spark、SparkStreaming、SparkStruncture、Flink。入口类都是统一的StreamingApp

object StreamingApp {
  def main(args: Array[String]): Unit = {
    val params = new ParamsUtil(args)
    require(params.hasParam("streaming.name"), "Application name should be set")
    PlatformManager.getOrCreate.run(params)
  }
}

通过streaming.platform可以指定不同的运行平台。当然,不同的运行引擎的jar包也不同。

SHome=/Users/allwefantasy/streamingpro

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_streaming \
-streaming.job.file.path file://$SHome/spark-streaming.json

bin/flink run -c streaming.core.StreamingApp \ 
/Users/allwefantasy/streamingpro/streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar \
-streaming.name god \
-streaming.platform flink_streaming \
-streaming.job.file.path file:///Users/allwefantasy/streamingpro/flink.json

jar包会被用来加载不同的Runtime。Runtime运行的映射关系定义在PlatformManagerplatformNameMapping变量中。
Runtime是一个接口,最主要的是startRuntime方法和params方法。后面我们把Runtime叫做执行引擎

trait StreamingRuntime {
  def startRuntime: StreamingRuntime
  def destroyRuntime(stopGraceful: Boolean, stopContext: Boolean = false): Boolean
  def streamingRuntimeInfo: StreamingRuntimeInfo
  def resetRuntimeOperator(runtimeOperator: RuntimeOperator)
  def configureStreamingRuntimeInfo(streamingRuntimeInfo: StreamingRuntimeInfo)
  def awaitTermination
  def startThriftServer
  def startHttpServer
  def params: JMap[Any, Any]
}

StreamingPro本质上还是通过spark-submit运行。框架的整体运行流程在PlatformManagerrun方法中。主要的步骤有:

  1. 设置配置信息
  2. 根据反射机制,创建并获取运行时环境
  3. 获取dispatcher以及所有的strategies
  4. 启动REST服务、Thrift服务、注册ZK(可选)
  5. 启动执行引擎,并等待作业完成

关于Dispatcher、Strategy的概念,参考作者的ServiceframeworkDispatcher项目。
反射创建执行引擎,调用的是对应Object类的getOrCreate方法,并传入params参数,最后实例化为StreamingRuntime。

  def platformNameMapping = Map[String, String](
    SPAKR_S_S -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
    SPAKR_STRUCTURED_STREAMING -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
    FLINK_STREAMING -> "streaming.core.strategy.platform.FlinkStreamingRuntime",
    SPAKR_STREAMING -> "streaming.core.strategy.platform.SparkStreamingRuntime",
    SPARK -> "streaming.core.strategy.platform.SparkRuntime"
  )

注意:StreamingPro的Runtime只是Spark作业的执行引擎,具体根据配置文件加载策略是ServiceframeworkDispatcher的工作。
假设我们定义了下面的一个配置文件,由于采用了shortName,需要定义一个ShortNameMapping

{
  "convert-multi-csv-to-json": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [
      {
        "name": "testProcessor"
      }
    ],
    "ref": [],
    "compositor": [
      {
        "name": "testCompositor"
      }
    ],
    "configParams": {
    }
  }
}

DefaultShortNameMapping的定义如下。这样配置文件中的spark就和ServiceframeworkDispatcher的加载过程对应起来了。

class DefaultShortNameMapping extends ShortNameMapping {
  private val compositorNameMap: Map[String, String] = Map[String, String](
    "spark" -> "serviceframework.dispatcher.test.DefaultStrategy",
    "testProcessor" -> "serviceframework.dispatcher.test.TestProcessor",
    "testCompositor" -> "serviceframework.dispatcher.test.TestCompositor"
  )
  override def forName(shortName: String): String = {
    if (compositorNameMap.contains(shortName)) compositorNameMap(shortName)
    else shortName
  }
}

ServiceframeworkDispatcher的核心是StrategyDispatcher,这个类在创建的时候,会读取配置文件。
然后解析配置文件中的strategy、algorithm(processor)、ref、compositor、configParams等配置项,并构造对应的对象。
ServiceframeworkDispatcher是一个模块组合框架,它主要定义了Compositor、Processor、Strategy三个接口。

Strategy接口包含了processor、ref、compositor,以及初始化和result方法。

trait Strategy[T] extends ServiceInj{
  def processor:JList[Processor[T]]
  def ref:JList[Strategy[T]]
  def compositor:JList[Compositor[T]]
  def name:String
  def initialize(name:String,alg:JList[Processor[T]],ref:JList[Strategy[T]],com:JList[Compositor[T]],params:JMap[Any,Any])
  def result(params:JMap[Any,Any]):JList[T]
  def configParams:util.Map[Any, Any]
  def stop = {}
}

Strategy策略的初始化需要算法、引用、组合器,以及配置信息,对应的方法是StrategyDispatcher的createStrategy方法。

注意下面的initialize方法,createAlgorithms和createCompositors初始化时
会读取params配置,这是一个嵌套了Map的列表:JList[JMap[String, Any]]

  def createStrategy(name: String, desc: JMap[_, _]): Option[Strategy[T]] = {
    if (_strategies.contains(name)) return None;
    // 实例化策略,如果有shortName,则先获取fullName,并通过Class.forName实例化具体的策略类
    val strategy = Class.forName(shortNameMapping.forName(desc.get("strategy").asInstanceOf[String])).newInstance().asInstanceOf[Strategy[T]]
    // 读取配置信息,并实例化为Map[Any,Any]
    val configParams: JMap[Any, Any] = if (desc.containsKey("configParams")) desc.get("configParams").asInstanceOf[JMap[Any, Any]] else new java.util.HashMap()
    // 初始化策略,需要创建算法、引用、组合器
    strategy.initialize(name, createAlgorithms(desc), createRefs(desc), createCompositors(desc), configParams)
    _strategies.put(name, strategy)
    Option(strategy)
  }

  // 创建算法。一个策略由0个或者多个算法提供结果
  private def createAlgorithms(jobJMap: JMap[String, Any]): JList[Processor[T]] = {
    if (!jobJMap.contains("algorithm") && !jobJMap.contains("processor")) return new AList[Processor[T]]()
    val processors = if (jobJMap.contains("algorithm")) jobJMap("algorithm") else jobJMap("processor")
    processors.asInstanceOf[JList[JMap[String, Any]]].map {
      alg =>
        val name = shortName2FullName(alg)
        val processor = Class.forName(name).newInstance().asInstanceOf[Processor[T]]
        val params: JList[JMap[String, Any]] = if (alg.contains("params")) alg("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
        processor.initialize(name, params)
        processor
    }
  }

  // 创建组合器,可以多个,按顺序调用。有点类似过滤器链。第一个过滤器会接受算法或者策略的结果。后续的组合器就只能处理上一阶段的组合器吐出的结果
  private def createCompositors(jobJMap: JMap[String, Any]): JList[Compositor[T]] = {
    if (!jobJMap.contains("compositor")) return new AList()
    val compositors = jobJMap.get("compositor")
    compositors.asInstanceOf[JList[JMap[String, Any]]].map {
      f =>
        val compositor = Class.forName(shortName2FullName(f)).newInstance().asInstanceOf[Compositor[T]]
        val params: JList[JMap[String, Any]] = if (f.contains("params")) f.get("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
        compositor.initialize(f.get("typeFilter").asInstanceOf[JList[String]], params)
        compositor
    }
  }

ServiceframeworkDispatcher的核心是StrategyDispatcher,而StrategyDispatcher的核心是其dispatch方法。

  def dispatch(params: JMap[Any, Any]): JList[T] = {
    findStrategies(clientType) match {
      case Some(strategies) =>
        strategies.flatMap { f => f.result(params) }
    }
  }

不同执行引擎的启动方法实现不同:

class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener {
  override def startRuntime: StreamingRuntime = this

  var sparkSession: SparkSession = createRuntime
  def createRuntime = {
    //...创建SparkSession,这里会根据参数判断是否支持Hive、Carbondata
  }

  params.put("_session_", sparkSession) //将SparkSession放入params中
  registerUDF  

  override def params: JMap[Any, Any] = _params
}

class SparkStreamingRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener { self =>
  var streamingContext: StreamingContext = createRuntime
  def createRuntime = {
    //创建StreamingContext,并将SparkSession放入params中
  }

  override def startRuntime = {
    streamingContext.start()
    this
  }
  override def awaitTermination = streamingContext.awaitTermination()
}

但真正执行StreamingPro主流程在streamingpro-commons下的SparkStreamingStrategy类。
注意:如果是spark-1.6,则streamingpro-spark下也有一个SparkStreamingStrategy类。

class SparkStreamingStrategy[T] extends Strategy[T] with DebugTrait with JobStrategy {
  var _ref: util.List[Strategy[T]] = _
  var _compositor: util.List[Compositor[T]] = _
  var _processor: util.List[Processor[T]] = _
  var _configParams: util.Map[Any, Any] = _

  def result(params: util.Map[Any, Any]): util.List[T] = {
    ref.foreach { r => r.result(params) } // 先执行ref
    if (compositor != null && compositor.size() > 0) {
      // 第一个Compositor, 产生第一个中间结果
      var middleR = compositor.get(0).result(processor, ref, null, params)
      // 将新的中间结果运用到下一个Compositor
      // 第一个Compositor的结果运用到第二个的输入, 第二个Compositor的结果运用到第三个Compositor的输入...
      // 所以不同Compositor是链式执行的
      for (i <- 1 until compositor.size()) {
        middleR = compositor.get(i).result(processor, ref, middleR, params)
      }
      middleR
    } else new util.ArrayList[T]()
  }  
}

注意:配置文件中每个Job都有一个strategy级别的configParamsref也会使用这个全局的configParams
它是一个Map[String, Any]的结构。每个Compositor和Processor内部也有一个params配置,这是一个数组。

实际上,全局的configParams参数会被用在Strategy、Ref/Processor和Compositor的result()方法的最后一个参数。

    "compositor": [
      {
        "name": "testCompositor",
        "params": [
          {
            "sql": "select avg(value) avgAge from test",
            "outputTableName": "test3"
          },
          {
            "sql": "select sum(value) sumAge from test",
            "outputTableName": "test4"
          }
        ]
      }
    ],

接下来以读取多个数据源的Compositor实现类为例:

  • _configParams是在创建Compositor时初始化调用的,这是一个List[Map[String, Any]]的结构,对应了params列表配置
  • 如果需要替换,则会先处理配置信息
  • 接着,从params中获取SparkSession(还记得之前创建Runtime时放入Map中吗?),
  • 然后,执行sparkSession.read.format(xx).options(Map).load(path)
  • 最后,通过df.createOrReplaceTempView创建Spark SQL的临时表,名称为outputTable
class MultiSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
  private var _configParams: util.List[util.Map[Any, Any]] = _

  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

    _configParams.foreach { sourceConfig =>
      val name = sourceConfig.getOrElse("name", "").toString

      val _cfg = sourceConfig.map(f => (f._1.toString, f._2.toString)).map { f =>
        (f._1, params.getOrElse(s"streaming.sql.source.${name}.${f._1}", f._2).toString)
      }.toMap

      val sourcePath = _cfg("path")
      val df = sparkSession(params).read.format(sourceConfig("format").toString).options(
        (_cfg - "format" - "path" - "outputTable").map(f => (f._1.toString, f._2.toString))).load(sourcePath)
      df.createOrReplaceTempView(_cfg.getOrElse("outputTable", _cfg.getOrElse("outputTableName", "")))
    }
    List()
  }
}

为了支持配置的动态替换,_cfg参数会做一些处理,比如上面的s"streaming.sql.source.${name}.${f._1}"如果需要被替换,则会被替换为f._2
下表列举了StreamingPro支持的几种替换方式。

配置参数 配置示例 动态传参数
streaming.sql.source.[name].[参数] "path": "file:///tmp/sample_article.txt" -streaming.sql.source.firstSource.path file:///tmp/wow.txt
streaming.sql.out.[name].[参数] "path": "file:///tmp/sample_article.txt" -streaming.sql.source.firstSink.path file:///tmp/wow_20170101.txt
streaming.sql.params.[param-name] "sql": "select * from test where hp_time=:today" -streaming.sql.params.today "20170101"

假设有两个数据输入源和一个输出目标的配置如下:

      {
        "name": "batch.sources",
        "params": [
          {
            "name":"firstSource",
            "path": "file:///tmp/sample_article.txt",
            "format": "com.databricks.spark.csv",
            "outputTable": "article",
            "header":true
          },
          {
              "name":"secondSource",
              "path": "file:///tmp/sample_article2.txt",
              "format": "com.databricks.spark.csv",
              "outputTable": "article2",
              "header":true
            }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "name":"firstSink",
            "path": "file:///tmp/sample_article.txt",
            "format": "com.databricks.spark.csv",
            "outputTable": "article",
            "header":true
          }
        ]
      }

Source的功能是:读取输入源形成DataFrame,然后创建临时表。其他组件比如SQL也是类似的。至此StreamingPro的大致流程就分析完了。

数据库查询可否用 like?

看到源码里数据库部分类似这样:
from(DB.tSparkJobParameter)(s => where(s.id === id) select (s)).singleOption
请问这种数据库查询 在查询字符串的时候能否使用like来做呢?比如
from(DB.tSparkJobParameter)(s => where(s.name like name) select (s)).singleOption

XQL报错

照着例子连接了一个mysql,报错如下:
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:526)
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72)
com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
com.mysql.cj.jdbc.ConnectionImpl.(ConnectionImpl.java:633)
com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:61)
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:52)
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:113)
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
streaming.dsl.LoadAdaptor$$anonfun$parse$1.apply(LoadAdaptor.scala:34)
streaming.dsl.LoadAdaptor$$anonfun$parse$1.apply(LoadAdaptor.scala:14)
scala.collection.immutable.Range.foreach(Range.scala:160)
streaming.dsl.LoadAdaptor.parse(LoadAdaptor.scala:14)
...
换了hive之后,报错如下:
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:68)
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
streaming.dsl.SelectAdaptor.parse(SelectAdaptor.scala:17)
streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:76)
streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:226)
org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:71)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:54)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:54)
streaming.rest.RestController.script(RestController.scala:83)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
net.csdn.modules.http.RestController.filter(RestController.java:139)
net.csdn.modules.http.RestController.dispatchRequest(RestController.java:99)
net.csdn.modules.http.HttpServer$DefaultHandler.handle(HttpServer.java:184)
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
org.eclipse.jetty.server.Server.handle(Server.java:499)
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
java.lang.Thread.run(Thread.java:745)

提交应用报错

提交的命令为./bin/spark-submit --master local[2] --class streaming.core.StreamingApp --name test ../spark-2.1.0/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar -streaming.name test -streaming.platform spark -streaming.job.file.path file:///./test.json

出错的信息为
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; at streaming.core.strategy.platform.PlatformManager$.platformNameMapping(PlatformManager.scala:212) at streaming.core.strategy.platform.PlatformManager$.getRuntime(PlatformManager.scala:194) at streaming.core.strategy.platform.PlatformManager.run(PlatformManager.scala:82) at streaming.core.StreamingApp$.main(StreamingApp.scala:14) at streaming.core.StreamingApp.main(StreamingApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

manager能否动态读取配置文件

最近在基于manager做二次开发,每次修改完application.yml之后都要重新编译,请问有没有方法可以让manager动态的加载配置???不用每次都编译一遍,比如ip地址、端口这些,改下ip就要重新编译一遍,贼麻烦

同一配置文件中多个jobs初始化时的依赖问题

问题说明:

多个属于同一类业务的jobs放在一个配置文件中,且多个jobs彼此之间没有依赖关系,在执行其中一个job时,发现所有该配置文件中所有jobs都会被初始化。

所以,会有下面的顾虑:
若某job配置或其他原因导致初始化失败时,会影响其他jobs的正常执行,并导致其他jobs执行失败

net文件夹jar

您好,我想自己增加streamingpro的reduce等算子,可以把那个这个工程上的上的本地仓库的net文件夹上传一下不。因为编译项目提示就是缺少里面的jar包。谢谢了

sparksql方式如何添加udf函数

在sparkRuntime中注册了ufd之后,通过sparksql的http接口,即/run/sql,可以使用注册的udf函数;但是通过sparksql开启的thriftserver,即通过jdbc:hive2:访问时,就无法使用udf函数,报错显示未注册在default数据库中,这个怎么破?

splitflatmap

比如真实的map为 map(func) ,自己也想实现为这个格式的,关键这个map的参数是一个func的函数参数
自己 def sep = {
config[String]("separator", _configParams)
}
这里得到seq类似的字符串,想传递到func,
最后编译有问题,有没有好的思路哈,求大神指导。还是这样本身做就是行不通的呢。

Table 'spark_job.t_spark_job_parameter' doesn't exist

When I visit this page(http://{host}:9004/submit_job.html), I encountered this error:

image

I checked the spark_jobs_2017-07-18.sql,but I did not find any description about the table 'spark_job.t_spark_job_parameter'

Error: SparkRuntime cannot be cast to SparkStreamingRuntime

when I am running
./bin/spark-submit --class streaming.core.StreamingApp
--master local[2]
--name sql-interactive
/tmp/streamingpro-0.2.1-SNAPSHOT-online-1.6.1.jar
-streaming.name sql-interactive
-streaming.platform spark
-streaming.rest true
-streaming.driver.port 9004
-streaming.spark.service true

it is giving me Error:
Exception in thread "main" java.lang.ClassCastException: streaming.core.strategy.platform.SparkRuntime cannot be cast to streaming.core.strategy.platform.SparkStreamingRuntime

Any help would be appreciated.

Thanks

error

restController中:
if (!runtime.isInstanceOf[SparkStreamingRuntime]) if前面需要加上!,望知晓。
另外要是启动的时候, set spark.driver.allowMultipleContexts = true的话,是不是性能就受很大影响了哈
但是这样是可以动态添加job的。

streamingpro-manager start fail

Start Params:
java -cp ./streamingpro-manager-0.4.15-SNAPSHOT.jar streaming.App -yarnUrl dmp2:8088 -jdbcPath ./conf/jdbc.properties -envPath ./conf/env.properties

Error message:
image

jdbc.properties:
image

xql load hbae报错

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/mapreduce/TableInputFormatBase
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$toInputFormat(HiveClientImpl.scala:813)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$toHiveTable$4.apply(HiveClientImpl.scala:859)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$toHiveTable$4.apply(HiveClientImpl.scala:859)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:859)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hiveQlTable$lzycompute(HiveTableScanExec.scala:84)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hiveQlTable(HiveTableScanExec.scala:84)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.tableDesc$lzycompute(HiveTableScanExec.scala:86)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.tableDesc(HiveTableScanExec.scala:85)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader$lzycompute(HiveTableScanExec.scala:102)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader(HiveTableScanExec.scala:99)
at org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$10.apply(HiveTableScanExec.scala:184)
at org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$10.apply(HiveTableScanExec.scala:184)
at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2478)
at org.apache.spark.sql.hive.execution.HiveTableScanExec.doExecute(HiveTableScanExec.scala:183)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:298)
at org.apache.spark.sql.execution.QueryExecution.hiveResultString(QueryExecution.scala:133)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:340)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:248)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Query UI sort bug

Every column in Query UI of StreamingPro will be treated as string .

spark log报error但是yarn给出的finalStatus是SUCCEEDED

许多类似的地方,举例 MultiSQLSourceCompositor.scala:
val df = session.read.format(sourceConfig("format").toString).options((_cfg - "format" - "path" -"outputTable").map(f => (f._1.toString, f._2.toString))).load(sourcePath)

这里出现异常的话,最后会在serviceframework的dispath函数里捕获,但是只打印了错误信息,没有结束spark程序,导致出现spark log报error但是yarn给出的finalStatus是SUCCEEDED,你可以测试一下。

ML SQL design

A quick question regarding the design of MLSQL.
Does MLSQL parsed predefined SQL keywords, such as train, to generate scala or python code running against Spark MLLib?

执行测试用例失败

我按照http://www.jianshu.com/p/d10edd6c7cf9进行执行测试
执行命令:
spark-submit --class streaming.core.StreamingApp --master local --name test streamingpro-spark-0.4.14-SNAPSHOT-jar-with-dependencies.jar -streaming.name test -streaming.platform spark -streaming.job.file.path hdfs://XXXX:8020/streaming/test.json
但是执行报错
17/06/28 09:16:35 INFO Executor: Adding file:/tmp/spark-72838dd1-4362-453b-b6ac-86e8c888cca8/userFiles-7572807c-6955-4884-8cbf-af1af3cde129/streamingpro-spark-0.4.14-SNAPSHOT-jar-with-dependencies.jar to class loader
17/06/28 09:16:35 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1636 bytes result sent to driver
17/06/28 09:16:35 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 353 ms on localhost (1/1)
17/06/28 09:16:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/06/28 09:16:35 INFO DAGScheduler: ResultStage 0 (json at MockJsonCompositor.scala:31) finished in 0.369 s
17/06/28 09:16:35 INFO DAGScheduler: Job 0 finished: json at MockJsonCompositor.scala:31, took 0.588449 s
17/06/28 09:16:35 ERROR StrategyDispatcher: 调用链路异常
java.lang.ClassCastException: streaming.core.strategy.platform.SparkRuntime cannot be cast to streaming.core.strategy.platform.SparkStreamingRuntime
at streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor.result(MockInputStreamCompositor.scala:32)
at streaming.core.strategy.SparkStreamingStrategy.result(SparkStreamingStrategy.scala:52)
at serviceframework.dispatcher.StrategyDispatcher$$anonfun$dispatch$2.apply(StrategyDispatcher.scala:65)
at serviceframework.dispatcher.StrategyDispatcher$$anonfun$dispatch$2.apply(StrategyDispatcher.scala:63)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at serviceframework.dispatcher.StrategyDispatcher.dispatch(StrategyDispatcher.scala:62)
at streaming.core.strategy.platform.PlatformManager$$anonfun$run$3.apply(PlatformManager.scala:120)
at streaming.core.strategy.platform.PlatformManager$$anonfun$run$3.apply(PlatformManager.scala:118)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at streaming.core.strategy.platform.PlatformManager.run(PlatformManager.scala:117)
at streaming.core.StreamingApp$.main(StreamingApp.scala:14)
at streaming.core.StreamingApp.main(StreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/06/28 09:16:35 INFO SparkContext: Invoking stop() from shutdown hook

请问是哪里有问题呢

请教您个关于spark stream on Mongodb的问题

最近看到您用Scala写了些关于spark stream 的demo,对我写spark streaming on mongo有很多指导价值,在这里我对spark streaming on monggo 有些迷茫,不知道怎么才能将spark 通过stream与mongo 很好的结合起来?请前辈赐教

Exception in thread "main" net.csdn.common.exception.FailedToResolveConfigException: Failed to resolve config path [strategy.v2.json]

./bin/spark-submit --class streaming.core.StreamingApp --master local[2] --name sql-interactive /data0/chaochao1/streamingpro-0.4.1-SNAPSHOT-online-2.0.0.jar -streaming.name sql-interactive -streaming.platform spark -streaming.rest true -streaming.driver.port 9004 -streaming.spark.service true

执行上面的命令,出现如下错误,是不是缺少,配置文件,参考哪里可以?
我参考下面的这个wiki
https://github.com/allwefantasy/streamingpro/wiki/%E5%88%A9%E7%94%A8StreamingPro%E5%AE%9E%E7%8E%B0SQL-%E4%BA%A4%E4%BA%92%E5%BC%8F%E6%9F%A5%E8%AF%A2

spark/spark-2.0.0-bin/config/strategy.v2.json], and classpath
at net.csdn.common.env.Environment.resolveConfig(Environment.java:223)
at net.csdn.common.env.Environment.resolveConfigAndLoadToString(Environment.java:186)
at serviceframework.dispatcher.StrategyDispatcher.loadConfig(StrategyDispatcher.scala:115)
at serviceframework.dispatcher.StrategyDispatcher$.getOrCreate(StrategyDispatcher.scala:218)
at streaming.core.Dispatcher$.dispatcher(Dispatcher.scala:39)
at streaming.core.strategy.platform.PlatformManager.findDispatcher(PlatformManager.scala:34)
at streaming.core.strategy.platform.PlatformManager.run(PlatformManager.scala:87)
at streaming.core.StreamingApp$.main(StreamingApp.scala:14)
at streaming.core.StreamingApp.main(StreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

spark 2.1 support carbondata 1.1.0 mvn编译出错

Spark does not yet support its JDBC component for spark2.1,Scala2.11?
ERROR as below:

[ERROR] D:\forstr\streamingpro-car\streamingpro-spark-2.0\src\main\java\streaming\core\strategy\platform\SparkRuntime.scala:9: error: object thriftserver is not a member of package org.apache.spark.sql.hive
[INFO] import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
[INFO] ^
[ERROR] D:\forstr\streamingpro-car\streamingpro-spark-2.0\src\main\java\streaming\core\strategy\platform\SparkRuntime.scala:94: error: not found: value HiveThriftServer2
[INFO] HiveThriftServer2.startWithContext(sparkSession.sqlContext.asInstanceOf[HiveContext])

compiler erroe

在windows中git clone https://github.com/allwefantasy/csdn_common.git
编译第一个的时候
[ERROR] if (keys.length > 2) throw new ArgumentErrorException("不支持超过三级层次的参数传▒??");
[ERROR] ^
[ERROR]
[ERROR] Command line was: "C:\Program Files\Java\jdk1.8.0_60\jre..\bin\javadoc.exe" @options @packages
[ERROR]
[ERROR] Refer to the generated Javadoc files in 'D:\code\spark_test\csdn_common\target\apidocs' dir.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]

manager 页面无法访问

启动后没有报错,但是通过9000端口访问时无法访问!
[2018-02-28 10:25:24,015][INFO ][bootstrap.loader.impl ] scan service package => null
[2018-02-28 10:25:24,016][INFO ][bootstrap.loader.impl ] load service in ServiceFramwork.serviceModules =>0
[2018-02-28 10:25:24,016][INFO ][bootstrap.loader.impl ] total load service =>8
[2018-02-28 10:25:24,304][INFO ][org.eclipse.jetty.util.log] Logging initialized @555ms
[2018-02-28 10:25:24,556][INFO ][org.eclipse.jetty.server.Server] jetty-9.2.z-SNAPSHOT
[2018-02-28 10:25:24,576][INFO ][org.eclipse.jetty.server.ServerConnector] Started ServerConnector@479a28ae{HTTP/1.1}{:9000}
[2018-02-28 10:25:24,576][INFO ][org.eclipse.jetty.server.Server] Started @828ms

CURD problem

自己使用curl尝试添加一个新的任务时候,
curl: (7) Failed to connect to 10.108.166.96 port 9902: Connection refused

SQL

在Sql查询hdfs的时候,不太懂,有没有具体的实际例子可以参考一下哈。

The query result ui will miss the column

When the query result is following:

[
  {
    "ac": "block"
  },
  {
    "ac": "gslb"
  },
  {
    "ac": "time",
    "pt": 14
  },
  {
    "ac": "play"
  },
  {
    "ac": "block"
  },
  {
    "ac": "play"
  },
  {
    "ac": "end"
  },
  {
    "ac": "time",
    "pt": 5
  },
  {
    "ac": "init"
  },
  {
    "ac": "init"
  },
  {
    "ac": "time",
    "pt": 8
  },
  {
    "ac": "end"
  },
  {
    "ac": "gslb"
  },
  {
    "ac": "block"
  },
  {
    "ac": "gslb"
  },
  {
    "ac": "init"
  },
  {
    "ac": "block"
  },
  {
    "ac": "end"
  },
  {
    "ac": "time",
    "pt": 0
  }
]

The query result ui will miss the pt column.

写hdfs效率很慢

job 的 stages总是卡在
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) streaming.dsl.SaveAdaptor.parse(SaveAdaptor.scala:48)

spark streaming读取kafka的数据到jdbc,出现java.lang.Exception: Could not compute split错误

经过一段时间后就会出错,采用的kafka createDirectStream方式:
[2017-08-12 17:00:59,725][WARN ][org.apache.spark.scheduler.TaskSetManager] Lost task 0.0 in stage 102.0 (TID 7448, worker16.hadoop, executor 40): java.lang.Exception: Could not compute split, block input-0-1502528277400 of RDD 189 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

请教思路:不重启Spark Streaming,动态对job进行添加、删除或者更新

您好,目前有这方面的需求,但看了spark-submit提交作业的过程后发现,这不容易实现。
spark-submit在提交作业后要进行一些列的操作,比如Spark Context的初始化,DAG Scheduler和Task Scheduer的创建,从resourceManger申请资源,excutor 线程分配等。
个人的理解是,如果想动态添加job,需要让正在运行的spark-streaming完成上述操作。这样的话,如果对spark代码的理解不够深入的话,不容易实现。
请问您的思路是什么,非常感谢!

StreamingPro集群模式sql查询权限问题

问题描述如下:
1)用client模式启动,一切正常
./bin/spark-submit --class streaming.core.StreamingApp
--master yarn
--name sql-interactive
/usr/local/fqlhadoop/streamingpro/streamingpro-spark-2.0-1.0.0.jar
-streaming.name sql-interactive
-streaming.job.file.path hdfs://haclusterdev/streamingpro/test.json
-streaming.platform spark
-streaming.rest true
-streaming.driver.port 9003
-streaming.spark.service true
-streaming.thrift true
-streaming.enableHiveSupport true
sql查询如下:
查某个表:
curl --request POST --url http://3.hadoopdev.com:9003/run/sql --data 'sql=select * from dp_monitor.t_aggregate_provider limit 10'
查所有库:
curl --request POST --url http://3.hadoopdev.com:9003/run/sql --data 'sql=show databases'
rest和thrift接口都能正确查询
2)用cluster模式启动,有问题
./bin/spark-submit --class streaming.core.StreamingApp
--master yarn
--deploy-mode cluster
--name sql-interactive
/usr/local/fqlhadoop/streamingpro/streamingpro-spark-2.0-1.0.0.jar
-streaming.name sql-interactive
-streaming.job.file.path hdfs://haclusterdev/streamingpro/test.json
-streaming.platform spark
-streaming.rest true
-streaming.driver.port 9003
-streaming.spark.service true
-streaming.thrift true
-streaming.enableHiveSupport true
-streaming.zk.servers 1.hadoopdev.com:2181
-streaming.zk.conf_root_dir /streamingpro/xx
sql查询有问题:
查所有库,sql同上,显示结果:
[{"databaseName":"default"}] 只查出了默认的库
查某个表,显示结果:
Table or view not found: dp_monitor.t_aggregate_provider; line 1 pos 14;
'GlobalLimit 10
+- 'LocalLimit 10
+- 'Project [*]
+- 'UnresolvedRelation dp_monitor.t_aggregate_provider

查看集群模式driver 的log日志,显示如下:

[2017-11-20 19:02:56,852][INFO ][org.apache.spark.sql.execution.SparkSqlParser] Parsing command: select * from dp_monitor.t_aggregate_provider limit 10
[2017-11-20 19:02:57,218][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore] 1: get_database: dp_monitor
[2017-11-20 19:02:57,219][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore.audit] ugi=biadmin ip=unknown-ip-addr cmd=get_database: dp_monitor
[2017-11-20 19:02:57,220][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore] 1: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
[2017-11-20 19:02:57,224][INFO ][org.apache.hadoop.hive.metastore.ObjectStore] ObjectStore, initialize called
[2017-11-20 19:02:57,243][INFO ][DataNucleus.Query ] Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
[2017-11-20 19:02:57,244][INFO ][org.apache.hadoop.hive.metastore.MetaStoreDirectSql] Using direct SQL, underlying DB is DERBY
[2017-11-20 19:02:57,245][INFO ][org.apache.hadoop.hive.metastore.ObjectStore] Initialized ObjectStore
[2017-11-20 19:02:57,247][WARN ][org.apache.hadoop.hive.metastore.ObjectStore] Failed to get database dp_monitor, returning NoSuchObjectException
[2017-11-20 19:02:57,387][INFO ][modules.http.processor.impl] Completed 500 in 554ms POST /run/sql

[2017-11-20 19:03:01,051][INFO ][org.apache.spark.sql.execution.SparkSqlParser] Parsing command: show databases
[2017-11-20 19:03:01,334][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore] 2: get_databases: *
[2017-11-20 19:03:01,334][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore.audit] ugi=biadmin ip=unknown-ip-addr cmd=get_databases: *
[2017-11-20 19:03:01,334][INFO ][org.apache.hadoop.hive.metastore.HiveMetaStore] 2: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
[2017-11-20 19:03:01,336][INFO ][org.apache.hadoop.hive.metastore.ObjectStore] ObjectStore, initialize called
[2017-11-20 19:03:01,343][INFO ][DataNucleus.Query ] Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
[2017-11-20 19:03:01,343][INFO ][org.apache.hadoop.hive.metastore.MetaStoreDirectSql] Using direct SQL, underlying DB is DERBY
[2017-11-20 19:03:01,343][INFO ][org.apache.hadoop.hive.metastore.ObjectStore] Initialized ObjectStore
[2017-11-20 19:03:02,346][INFO ][org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator] Code generated in 294.131774 ms
[2017-11-20 19:03:02,433][INFO ][org.apache.spark.SparkContext] Starting job: collect at RestController.scala:125
[2017-11-20 19:03:02,452][INFO ][org.apache.spark.scheduler.DAGScheduler] Got job 0 (collect at RestController.scala:125) with 1 output partitions
[2017-11-20 19:03:02,453][INFO ][org.apache.spark.scheduler.DAGScheduler] Final stage: ResultStage 0 (collect at RestController.scala:125)
[2017-11-20 19:03:02,453][INFO ][org.apache.spark.scheduler.DAGScheduler] Parents of final stage: List()
[2017-11-20 19:03:02,455][INFO ][org.apache.spark.scheduler.DAGScheduler] Missing parents: List()
[2017-11-20 19:03:02,465][INFO ][org.apache.spark.scheduler.DAGScheduler] Submitting ResultStage 0 (MapPartitionsRDD[4] at collect at RestController.scala:125), which has no missing parents
[2017-11-20 19:03:02,533][INFO ][org.apache.spark.storage.memory.MemoryStore] Block broadcast_0 stored as values in memory (estimated size 7.0 KB, free 366.3 MB)
[2017-11-20 19:03:02,741][INFO ][org.apache.spark.storage.memory.MemoryStore] Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.6 KB, free 366.3 MB)
[2017-11-20 19:03:02,745][INFO ][org.apache.spark.storage.BlockManagerInfo] Added broadcast_0_piece0 in memory on 10.1.50.222:38001 (size: 3.6 KB, free: 366.3 MB)
[2017-11-20 19:03:02,747][INFO ][org.apache.spark.SparkContext] Created broadcast 0 from broadcast at DAGScheduler.scala:1006
[2017-11-20 19:03:02,763][INFO ][org.apache.spark.scheduler.DAGScheduler] Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[4] at collect at RestController.scala:125) (first 15 tasks are for partitions Vector(0))
[2017-11-20 19:03:02,764][INFO ][org.apache.spark.scheduler.cluster.YarnClusterScheduler] Adding task set 0.0 with 1 tasks
[2017-11-20 19:03:02,823][INFO ][org.apache.spark.scheduler.TaskSetManager] Starting task 0.0 in stage 0.0 (TID 0, 3.hadoopdev.com, executor 1, partition 0, PROCESS_LOCAL, 4942 bytes)
[2017-11-20 19:03:03,293][INFO ][org.apache.spark.storage.BlockManagerInfo] Added broadcast_0_piece0 in memory on 3.hadoopdev.com:38003 (size: 3.6 KB, free: 366.3 MB)
[2017-11-20 19:03:03,983][INFO ][org.apache.spark.scheduler.TaskSetManager] Finished task 0.0 in stage 0.0 (TID 0) in 1192 ms on 3.hadoopdev.com (executor 1) (1/1)
[2017-11-20 19:03:03,986][INFO ][org.apache.spark.scheduler.cluster.YarnClusterScheduler] Removed TaskSet 0.0, whose tasks have all completed, from pool
[2017-11-20 19:03:03,992][INFO ][org.apache.spark.scheduler.DAGScheduler] ResultStage 0 (collect at RestController.scala:125) finished in 1.206 s
[2017-11-20 19:03:03,997][INFO ][org.apache.spark.scheduler.DAGScheduler] Job 0 finished: collect at RestController.scala:125, took 1.563610 s
[2017-11-20 19:03:04,041][INFO ][org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator] Code generated in 10.83304 ms
[2017-11-20 19:03:04,049][INFO ][modules.http.processor.impl] Completed 200 in 3000ms POST /run/sql

看log中的WARN 日志,这个问题是不是访问hive的权限问题,hive和driver都是同一个用户启动,client和cluster 也都是相同用户,在cluster模式下 rest和thrift查询都是同样问题;或者说还有什么其他配置参数需要配置?

动态添加Job

大神,现在所有的job都是放在一个spark app里面,所以所有的job的执行时间间隔都是一样的,能有办法实现不同的job的时间间隔不同? 另外如果我job很多的情况下,都放在一个app里面会有问题?

com.google.inject.CreationException: Guice creation errors

Version
1.0.0

Reproduce conditions

spark-submit  \
--conf "spark.ui.port=4041" \
--conf "spark.sql.codegen=true" \
--class streaming.core.StreamingApp \
--master local[2] \
--name sql-interactive \
$SHOME/streamingpro-spark-2.0-1.0.0.jar  \
-streaming.name sql-interactive    \
-streaming.job.file.path file://$SHOME/empty.json \
-streaming.platform spark   \
-streaming.rest true   \
-streaming.driver.port 9003   \
-streaming.spark.service true \
-streaming.thrift true \
-streaming.enableHiveSupport true

When I run the streamingpro in rest mode, I have encountered this problem。
image
image

[ERROR][modules.http ] System processing error net.csdn.common.exception.RecordNotFoundException: 你请求的URL地址[/sqlui]不存在

spark-submit --class streaming.core.StreamingApp --master local[2] --name sql-interactive ./streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar -streaming.name sql-interactive -streaming.job.file.path hdfs://localhost:9000/tmp/test.json -streaming.platform spark -streaming.rest true -streaming.driver.port 9004 -streaming.spark.service true

执行上述命令,能够正常启动应用,但是访问页面[http://127.0.0.1:9004/sqlui]时出现了上述的问题,请问一下这个问题怎么处理?

有两个问题问下

1 后续提pr, 要 新建feature branch 提到 develop么, 之前看错提到 master了,你给merge了

2 对新加的一个正则 RegexParserCompositor 写了文档不知道放在哪个目录中提交。

你好,有没有运维相关的文档

你好!
看到这个开源项目很不错,想进一步了解一些信息,不能能否解答一下

  1. 有没有一些处理的metrics信息,例如目前处理kafka的处理延时,处理多少消息之类的。
  2. 日志需要如何比较方便的查看,问题查找的大致路径是什么
  3. 如何比较方便的进行开发调试

感谢!!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.