Giter Site home page Giter Site logo

wedatasphere's Introduction

English | 中文

WeDataSphere Open Source Components

Project with the blue "S" ball in the image below is open-sourced. Including DataSphere Studio, Linkis, Scriptis, Qualitis, Schedulis, Exchangis, Visualis, Prophecis, Streamis.
OSProjects

Click me to Github repo

Linkis builds a computation middleware layer to decouple the upper applications and the underlying data engines, provides standardized interfaces (REST, JDBC, WebSocket etc.) to easily connect to various underlying engines (Spark, Presto, Flink, etc.), while enables cross engine context sharing, unified job& engine governance and orchestration.

Click me to Github repo

DataSphere Studio is positioned as a data application development portal, and the closed loop covers the entire process of data application development. With a unified UI, the workflow-like graphical drag-and-drop development experience meets the entire lifecycle of data application development from data import, desensitization cleaning, data analysis, data mining, quality inspection, visualization, scheduling to data output applications, etc.

Click me to Github repo

Scriptis is for interactive data analysis with script development(SQL, Pyspark, HiveQL), task submission(Spark, Hive), UDF, function, resource management and intelligent diagnosis.

Click me to Github repo

Qualitis is a one-stop data quality management platform that supports quality verification, notification, and management for various datasource. It is used to solve various data quality problems caused by data processing.

Click me to Github repo

Schedulis is a high performance workflow task scheduling system that supports high availability and multi-tenant financial level features, Linkis computing middleware, and has been integrated into data application development portal DataSphere Studio

Click me to Github repo

Exchangis is a lightweight,highly extensible data exchange platform that supports data transmission between structured and unstructured heterogeneous data sources. On the application layer, it has business features such as data permission management and control, high availability of node services and multi-tenant resource isolation. On the data layer, it also has architectural characteristics such as diversified transmission architecture, module plug-in and low coupling of components.

Click me to Github repo

Visualis Visualis is an open source project based on Yixin Davinci Developed data visualization Bi tool. It has been integrated into the data application development portal Datasphere Studio in this release, Visualis 1.0.0 supports Linkis 1.1.1 and DSS 1.1.0.

Click me to Github repo

Prophecis is a one-stop machine learning platform developed by WeBank. It integrates multiple open-source machine learning frameworks, has the multi tenant management capability of machine learning compute cluster, and provides full stack container deployment and management services for production environment.

Click me to Github repo

Streamis is an jointed development project for Streaming application development and management established by WeBank, CtYun, Samoyed Financial Cloud and XianWeng Technology.

More open-source WDS components? Coming soon...

WeDataSphere Introduction

WeDataSphere is a financial level one-stop open-source suitcase for big data platforms. The fundamental platform consists of 4 layers for data exchange, data distribution, computation and storage; The functional platform consists of 3 layers for platform tools, data tools and application tools, focusing on the implementations of various user requirements about functional tools. These construct as a complete technical ecosystem of big data platform and provides one-stop sufficient components and functionalities support.

WeDataSphere Core Features

  • Fundamental capabilities

Powered by miscellaneous open-source components contributed by the community, such as Hadoop, Spark, Hbase, KubeFlow adn FFDL, WeDataSphere achieves financial level reliability on infrastructural data computation, storage and exchange. It also contributes some enhancements to those open-source versions by addressing security, performance, availability and manageability issues in practice with bug fixes.

  • Platform tools

Consists of a platform portal, a data middleware(Linkis) and an operation management system. The platform portal supports product map, financial expense calculation and cloud service application; As a data middleware, Linkis links concrete applications up with underlying computation/storage systems with capabilities of financial level multi-tenant, resource governance and access isolation, filling gaps for the open-source community and the industry; The operation management system encompasses cluster management, configuration management, change management and service request automation, supports one-click installation, one-click upgrade and graphical operation&maintenance, and provides functionalities of alert, health monitoring&diagnosis and automatic recovery, simplifying the operation&maintenance process of the platform.

  • Data tools

Consists of data map, data desensitization, data quality and data exchange tools across different Hadoop clusters. Data map manages the universal data resource of the whole bank, with components of meta-data management, data access control, data consanguinity and the on-developing data quality and data model functions. Data desensitization desensitizes highly confidential data and keeps users from accessing it directly. The data quality tool provides a unique process to define and detect the quality of datasets with immediate problem reporting. The data exchange tools across different Hadoop clusters supports the scheduling, monitoring, statistics and management for data exchange tasks.

  • Application tools

Consists of the development&exploration tool(Scriptis), a graphical workflow scheduling system, a data visualization BI tool and a machine learning support system. Scriptis connects with various computation/storage engines with graphical interface and multi development languages support. The graphical workflow scheduling system provides a graphical interface for workflow definition, job execution, dependency reveal, status display, historical statistics and monitoring configuration. The data visualization BI tool generates various charts by drag&drop operations and simple scripting, with scheduled email available. The machine learning support system supports multiple model training mode, including both self-developed ML algorithms and open-source ML frameworks, with multi-tenant management alility for high-performance clusters.

WeDataSphere major Advantages


![WDSAdvantages](https://github.com/WeBankFinTech/WeDataSphere/blob/master/images/introduction/WDSAdvantages.png)
  • One stop

    The 3 layers of platform tools, data tools and application tools plus the powerful machine learning capability, build up an enterprise big data solution.

  • Synchronization across clusters among 3 datacenters in 2 cities

    Effecient&reliable big data transportation across clusters/IDCs, with sophisticated data backup and disaster tolerance solutions.

  • Financial grade

    Unified security control, fully container/microservice adoption and multi-tenant isolation for different layers.

  • Seamless expirence

    The unique data middleware(Linkis) links up systems in different layers, bringing data consanguinity, code reusability and user resources altogether.

  • Open source

    Core components already open source, the rest coming soon.

WeDataSphere Community

If you desire immediate response, please kindly raise issues to us or scan the below QR code by WeChat and QQ to join our group:
weChatAndQQ

wedatasphere's People

Contributors

alexzywu avatar peacewong avatar ritakang0451 avatar sargentti avatar wushengyeyouya avatar yangli-os avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

wedatasphere's Issues

【有奖征文】天翼云数据中台工具的选型应用以及后续思路

背景

我们曾经的大数据平台,是基于CDH构建的,随着大数据两大商业公司CDH和HDP宣布合并,将不再更新新的版本;
我们调研了当前大数据平台的发展状况,对于我们这样体量很大的平台来说,自主研发将极大节约使用商业版的license成本,因此我们准备走上大数据平台核心技术自主研发的道路。
当时设计的架构如下,(这是去年画的架构,现在架构已经有很多优化,后面有空了我会画一个新的):
image
其中数据基础能力平台,我们选择基于围绕当时最新的社区版Apache Hadoop3.2 、Spark 3.0以及Hbase 2.2版本进行迭代优化,这样做除了很多性能提升以及新特性之外,最重要的一点是它支持滚动升级 ,代表着我们以后可以随时在用户无感知的情况下升级集群,以后跟着开源社区混了,也能更方便地把我们自己的优化与社区共享;目前已经稳定运行一年,扩容到单集群1W+理论上没有问题,这里暂不细说;
而运营与运维平台,在调研了市面上已有的开源产品后,我们决定自己从头研发,因为已有的包括Ambria等优秀的软件,在我们大规模平台的场景下支持得并不好,这里也暂不细说;
比较头痛的是数据开发与服务平台的技术选型;

选型

首先,我是非常不赞同上来就自己重新造轮子的,我们人力资源是有限的,整体技术水平也是处于学习阶段,并且时间紧张。放眼望去,已有的数据开发与服务相关的工具还是蛮多的,比如Apache Zeppelin,Apache Livy,Hue等等我们都有用过,我们也曾经基于Apache Livy封装研发了一个作业开发打包提交的工具。但是,这些工具还是相对比较割裂的,因为对于用户来说,如果各个功能都是非常割裂的话,用户体验是比较差的,为了提升用户体验,也为了增加大数据平台的安全性,我们希望能打磨一款能“ 统一作业提交入口的,一站式的大数据开发与服务平台”,在调研了业界众多产品后,我们最终选择基于WDS开源社区的开源产品进行打造,原因如下:

  1. Linkis满足了我们 统一入口 的想法,linkis本身从设计上就是为了统一入口而设计的,并且已经经过了微众银行自己内部业务的验证,统一入口能够屏蔽底层的复杂性,且保证持续智能优化的可能性;并且Linkis对资源的管控非常细粒度,这是更加安全可控的;
  2. DSS满足了我们 一站式数据开发与服务 的想法,DSS本身是可扩展的,并且工作流开发与底层的具体调度是解耦,这为我们未来的想法提供了很多可能性;
  3. 包括其它Exchangis、Schedulis……等等组件都是我们非常需要的,并且集成地很好;
  4. 完全基于Apache 2.0的协议完全开放开源,并且开源社区非常活跃,现在光微信群规模成员就已经到了接近3000人,这基本涵盖了国内想做或者使用这一块的大部分人了吧;
  5. 社区的发起人员以及维护运营的人员也是非常open,非常乐于助人,我们也是被这个氛围所打动,希望我们也能参与进来,一起把社区向开源开放的氛围逐步维护起来;
    所以最终,我们选择了基于WDS社区,针对我们的需求联合共建,打造大数据开发与服务平台,代号鲁班。
    应用

与基础能力平台的整合

首先,我们把linkis与我们自己的大数据基础平台进行了整合适配,由于Hadoop与Spark等本身向下兼容性是比较好的,所以在适配Hadoop3.2和Spark3.0期间,并没有太多要大改的地方,但是还是改动了一些,需要注意的是Spark3.0以后只支持Scala2.12,要用scala2.12来编译与spark相关的包,还有就是spark3.0后有一些API会有变动;
也做了一些二次开发,一些已经贡献到社区,暂不细说;
我们内部也会定期组织源码学习与分享活动,暂不细说;

支撑内部的经营分析项目

后面我们将大数据PaaS平台应用于内部的经营分析项目,该项目的目的在于收集我们所有的数据,然后用大数据的方法进行分析,来为领导层、市场部、财务部、销售部门等等提供决策分析;
在内部支撑的过程中,最重要的不仅是技术,更是内部业务数据团队的支持。虽然DSS这一套总体是比较稳定的,但是在适配或搭建的过程中,或多或少总会遇到一些问题,而这时候,数据团队的理解和支持对我们平台团队来说是非常重要的。在我们共同的努力下,内部数仓项目的数据团队基于鲁班的使用,极大地提升了数仓建设的开发效率。当然,在内部数仓团队的深度支持下,我们也发现了很多之前没有发现的隐藏bug,也为我们以及社区提了很多宝贵的建议,我们也将长期持续支持;
这里给我们的启示是,新平台的推广,真的需要业务侧与数据侧的理解与支持,以及持续的反馈,最终才能达到共赢;
image

核心大平台的应用与升级

我们内部有一个规模非常大的老平台,我们最近一直在进行平台的迁移工作,将商业版CDH平台的业务逐步迁移到新建设的大数据平台上,其中包括数据迁移、脚本从hive升级到Spark3.0,以及整体数据处理架构的升级等等,而其中最麻烦的,就是如何把旧的所有业务迁移到基于DSS构建的鲁班上;
这一块是比较复杂的,因为上面有成千上万的脚本在跑,是基于商业版的调度进行调度的,而旧的调度,与业务是紧耦合的,同时也有单点的风险。由于种种原因,这一块一直都没有动,直到平台迁移其它部分搞得差不多了,我们才准备回过头来看看如何处理这块难坑的骨头;
这一块我们的思路如下:

  1. 将旧调度的DAG,通过代码的方法,自动化地改为DSS自带工作流的DAG的格式;
  2. 将所有调度的脚本,都同时在测试环境原封复制一份,然后把所有的执行命令注释掉,只用虚拟的print语句来打印;
  3. 在测试环境寻找几十个具有代表性的DAG,然后进行测试,观察比对,来验证DAG转化的正确性;
  4. 然后逐步在生产环境完成调度任务的迁移,由于这一块涉及到了生产,因此我们比较谨慎,将长期持续做这件事情;
    调度升级只是核心大平台应用的一步,让用户逐步通过鲁班来统一入口使用,是一件漫长的过程。为了满足用户的定制化需求,我们开发了一个新功能,就是把大数据平台的客户端放到了K8S之上,然后把它引入到鲁班里,这样的话,每个用户就可以在页面上申请自己专属的安全的客户端,来进行一些目前鲁班不支持的功能,同时又能满足统一入口的要求;

某重要的民生项目

我们同时也会承接一些重要的外部项目,我个人认为这是非常有意义的,能够让全国各地的人们享受大数据AI带来的便利,这是让我们整个团队的兄弟们都觉得倍有激情的一件事情,尤其当一个项目做到朋友家里那边去的时候,跟亲人朋友讲起来你现在用得这个就是我们在做得,也倍有成就感;
我觉得,作为国家新基建号召的一员,我们作为央企,更要把大数据AI能力赋能到那些偏远的地方,我就是偏远的地方来的,高铁修到我们家门口的时候,我是能明显得感觉到国家的力量,我觉得这个比收入更有成就感,简单附图如下:
image

把鲁班搬到公有云上

我们在鲁班内部应用的过程中,由于有着不错的反响,领导提出了是不是可以把鲁班改造成公有云版本,来支撑我们的天翼云诸葛AI平台。经过讨论后,当即就拍板做,改造前后花了2个月的时间,目前准备上线。
天翼云作为规模全国前三(也有说法是前四)的云服务商,也作为国家队的一员,在全国各个省都有很大的资源池,具备丰富的硬件资源,因此未来将大数据搬到云上是必然的趋势。只不过一直想着等权限这一块成熟了,产品化搞好点,明年再抽出时间来做,不过本着用户是上帝的原则,我们也就加班加点直接上阵了;
为了将鲁班上公有云,我们做了很多事情,这里简单列一下,后面会有同学写文章详细写这一块:

  1. 对接天翼云的单点登录做了一些改造;
  2. 安全全面升级,我们安全团队测出来了几百个安全漏洞,我们一个一个都改了,包括http升级为了https;这一块后续我们有空了都会提交给社区(最近年底实在太忙了);
  3. 域名收敛工作,现在的DSS的域名是不收敛的,这样也会有安全隐患,因为要对外开通很多很多端口,所以我们对域名进行了收敛(后续也会贡献);
  4. 用户资源自动开通;我们专门开发了一个独立的服务,使得用户的资源可以自动开通或者销毁,包括Hadoop、hive、以及dss中各种用户等,当然这一块最好是需要底层的Hadoop支持定额的容量分配;如果社区里其他伙伴也有类似的需求,我们后面有空了也会将它整理然后贡献开源出来;
  5. 为了使得部署更加方便,我们将自动化部署脚本改造成了Ansible脚本的形式,便于分布式部署,这一块也在各种场景下验证,持续优化迭代;
  6. 当然还有蛮多事情,总之,现在放到公有云上只是第一步,后面还有很多地方需要持续迭代;

天翼云诸葛AI平台发布

在2020天翼智能生态博览会天翼云论坛上,我们发布了天翼云诸葛AI开放平台。对于各行各业绝大多数用户来说,他们需要的不是一个大数据平台,大部分人都不会用,而是真正能帮上用户解决一个个痛点问题,因此大数据和AI是不分家的,天翼云诸葛AI平台中的大数据平台部分就是我们支撑的,同时我们也支撑了AI算法的底层算力,比如 GPU0.1颗粒度调度等,这里暂不细说;
上图如下:
image

附一个新华网新闻链接:http://www.gd.xinhuanet.com/newscenter/2020-11/10/c_1126722071.htm

后续展望

我们后续希望基于WDS继续优化鲁班,要做得事情非常多,简单列出如下:

用户友好度提升

我们发现,现在的鲁班,对于不懂大数据开发,或者只了解简单的数据分析,不懂大数据的人来说,是不太好用的,会感觉比较乱,是新手不友好的,所以我们内部拉起了一个突破光明顶的小项目,目的是从用户的角度,提升产品的新手友好性,增加一些新手引导,给用户一个更加丝滑的使用感受,这一块后续也希望跟社区沟通,一起提升用户的友好度;

多租户问题

现在的鲁班,对于多用户的管理这一块支持并不是很好,所以这一块也希望跟社区沟通协同开发,一起把它持续设计迭代地越来越好;

调度升级

目前的调度能力,对于我们内部动辄成百上千个节点的DAG这种复杂场景来说是不太能满足的,比如全局展示,重跑等等,这一块我们还会继续调研研发;

数据治理相关工具

我们现在数据治理相关能力比较欠缺,目前开源的一些其它产品整合进来感觉怪怪的,所以还是需要跟社区一起探讨,优化,开源,共同验证,在先进的方法论的指导下,逐步迭代经得起考验的数据治理工具;

数据集成能力的完善

现在Exchangis已有的数据接入能力,是不能够完全满足我们所有的场景的,我们将开发API方式的数据接入,流式数据的接入能力,并且提供在公网下能够安全传输的能力,然后经过验证后,持续反馈到社区;;

其它社区已有工作的跟进与参与

比如支持Presto、ES、Kylin等等能力,以及Linkis1.0,DSS1.0等的强大能力,以及Linkis on K8S等的强大能力,这些都是我们非常感兴趣,将持续跟进的,基本上有发版后我们第一时间就会验证。如果后面有机会了,我们也希望参与其中,在大佬统一的规划下,为社区贡献一份自己的力量;

关于开源

最后,简单聊一下对开源社区的看法:
首先,我个人是非常热爱开源社区的,并且一直在仰望各位大佬,包括Hadoop、Spark等项目,我也常会去它们社区看看,学习学习。我认为开源社区的存在,极大地推动了人类科技的进步,像Linux、Hadoop、K8S、TensorFlow等等开源项目的核心人才,在我心中对人类的贡献,其价值是远大于喜欢打架的总统的;希望能多写点代码,被存储在北极的冰川下。
其次,开源的力量是势不可挡的。Linux开源,大家后面开发都用Linux,一定程度对Unix造成了影响,Hadoop、Spark、K8S、TensorFlow等等开源后,逐步也成为了大家使用和迭代的标准,极大地推进了相关领域的发展,顺应着开源软件潮流的公司,都收益良多。现在无论是国外的 Google、Facebook、亚马逊、IBM,还是国内的腾讯、阿里、华为等等,都在积极的拥抱开源。连曾经闭源的代表微软,都收购了 Github,积极地加入开源队伍。我们技术起步较晚,所以更要积极地加入开源的队伍中去,拥抱开源;
大数据开发与服务平台这一领域,目前国内微众发起的WDS社区是做得最好的,无论是技术架构层面还是社区运营层面。我们天翼云目前对于开源软件更多是单方面使用,贡献还是比较少的,一个是实在是忙没时间提交太多代码,一个是真的目前还没啥能拿的出手的特别亮眼的东西。希望后面能有一定的时间,我们能积极的参与到开源项目当中,从使用开源软件进步到反哺开源社区的全新阶段。也希望有更多的公司加入WDS社区,一起使用迭代,多多交流,只有交流使用多了,才会更加稳定,有更好的设计以及理念,才会越来越懂用户,我为人人,人人为我,大家共同受益;
最后,我也在思考一个问题,就是大数据开发与服务平台领域未来将会有很多很多好用的工具开源出来,包括个人开发的以及公司开发的,我们电信的一个优势是有很多的需求和场景可以迭代验证,而作为一名开发工程师,我是非常尊重软件的原创者的,简单的拿来主义让我有点不好意思。作为天翼云的一员,我也会积极思考,以后如何建立一个良性循环的生态,积极引进这些优秀的开源软件,与它打磨共建,赋能千行百业,为用户创造价值的同时,让开源软件的开发者,能从中获得更多实打实的收益。

最后,如果对大数据平台建设感兴趣的做事积极主动的小伙伴,或者对业内常见的大数据产品研究地比较透彻的产品小伙伴,欢迎联系我,加入我们团队,一起把大数据平台搞起来,把开源的代码搞得越来越稳定,越来越优秀

部署好后出问题

访问登录页出现 JSON Parse error: Unrecognized token '<', 登录时也是这个提示

【有奖征文】DSS + Linkis + Qualitis 在哗啦啦数据中台的应用与实践

一、应用场景

哗啦啦数据中台项目旨在为餐饮行业提供一站式的数据处理平台。满足数据接入、数据清洗、数据加工、质量校验、数据服务、数据输出的数据应用开发全流程场景需求。
数据中台整体架构如下:
image
其中Qualitis应用于数据质量系统提供规则引擎。linkis为数据质量以及开发系统调试功能提供计算引擎支持

二、解决的问题

1、jdbc引擎如执行运行小时级的hive sql时。元数据库中的任务状态不更新问题
2、jdbc引擎打通数据源管理系统,支持数据源配置
3、打通中台项目统一认证服务
4、支持参数解析(包括时间变量)
5、Qualitis增加告警以及定时调度功能

三:DSS在哗啦啦的最佳实践

1、开发平台系统

开发系统集成了大数据主流工具,通过拖拉拽dag生成数据处理流程,集代码开发、代码审核、任务监控告警、工作流版本、权限控制等功能。能满足用户大部分数据处理场景,开箱即用,简化用户开发流程,降低技术门槛。
在最初,用户写完脚本后,验证脚本正确性需通过提交代码审核之后真正执行任务,或者用第三方开发工具验证如zeppelin。用户反愦验证流程很不便利。在引入Linkis以及DSS后,开发平台基于DSS的前端做了二开,使用了他的脚本运行功能,提交任务到Linkis,并推送任务进度以及运行日志到浏览器,简化了用户的验证流程,优化了用户交互。完善了开发系统之前缺少的调试功能。
image
开发系统部分界面展示
image
image
image
image

2、数据质量系统

在开发系统上经过的 接入、清洗、加工、输出 一系列处理后,用户需要对数据质量进行评估,此时可用到我们的数据质量系统。
我们使用Qualitis作为质量的后端服务。前端重新开发了一套符合中台UI风格的系统。在Qualitis基础上,我们增加了定时调度以及告警,满足用户即时检测或者定时调度检测,并支持配置告警策略,即时通知到用户检测结果。
image
image
image

The End

Linkis是非常优秀的开源项目,很适合应用于业务系统层级提交任务的统一入口,屏蔽底层集群细节。非常期待1.0版本的混算和多集群支持。相信Linkis会走越来越远

【有奖征文】DSS在程序化广告中应用实践

一. 应用场景

珑玺科技的大数据管理平台DMP依托于Hadoop的下的HiveSpark等工具链展开, 之前的相关任务都是通过shell脚本的通过定时任务开展, 随着业务需求越来越复杂, 所沉淀的脚本越来越多, 变得难以维护, 增加了后续迭代和项目成员之间的沟通成本. 这时候我们看到微众刚刚开源的这个项目, 可以利用整个平台完成大数据的以下几个工作流:

  • 数据交换
  • 数据开发
  • 数据质量
  • 数据可视化
  • 数据发送

完成从数据的收集, 去重, 清洗, 规整等工作流, 使整个数据治理中的几乎所有工作, 可以通过 DSS 得到顺利流转, 提升了大数据的数据管理和分析的体验.

我司的DMP平台主要基于Hadoop 2.6来做集成的, 主要功能集中在元数据处理, 数据的去重,清洗, 和标准化. 实现平台的OLAP的工作流, 最终实现的业务需求, 主要集中在4个方面:

  • 元数据管理
  • 用户标签沉淀
  • 反作弊数据分析
  • 相关BI报表输出

二. 解决的问题

多数据源支持

平台支持主要的数据源有MySQL, Hive, 甚至支持最新的NewSQL平台分布式数据库TiDB, 和其他第三方数据数据格式. 使用此平台前, 需要做频繁的数据转换操作, 上下文切换成本比较高; 通过引入此系统之后, 可以通过平台集成的数据交换模块, 非常平滑的引入各种数据源进行交叉分析, 提升了多数据源管理的处理效率.

数据脚本开发

Picture1

DSS部署之前平台的各种任务都是编写shell脚本, 来实现对这个大数据看分析的流程, 随着业务的迭代, 和需求的增多, 脚本的可维护性变得越来越差, 而通过数据开发Scripts模块, 完全兼容hql, MySQL, PySpark, 几乎可以重用之前的大部分脚本, 而且执行过程和结果可视化, 提升了数据分析的工作效率.

Picture2

三. 最佳实践

阿里云OSS数据读取的问题

我们大部分数据都存储在阿里云对象存储OSS中, 所以需要另外配置读取OSSjar文件
同步阿里云OSSjars包到linkislib目录下面

附: CDH集成阿里云OSS说明

cd /opt/linkis
find . -name "lib" -print | awk '{printf("cp /home/hadoop/aliyun-oss-jars/*.jar /opt/linkis/%s/\n", $1)}' |sh
find . -name "lib" -print | awk '{printf("cp /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/jars/hadoop-oss-cdh-5.14.4/httpclient-4.5.2.jar /opt/linkis/%s/\n", $1)}'
find . -name "lib" -print | awk '{printf("cp /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/jars/hadoop-oss-cdh-5.14.4/httpcore-4.4.4.jar /opt/linkis/%s/\n", $1)}'
find . -name "lib" -print | awk '{printf("cp /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib/jdom-1.1.jar /opt/linkis/%s/\n", $1)}'
$ cd aliyun-oss-jars/
$ ls -l
total 2932
-rw-r--r-- 1 hadoop hadoop 116337 Jan 2 10:59 aliyun-java-sdk-core-3.4.0.jar
-rw-r--r-- 1 hadoop hadoop 788137 Jan 2 10:59 aliyun-java-sdk-ecs-4.2.0.jar
-rw-r--r-- 1 hadoop hadoop 215492 Jan 2 10:59 aliyun-java-sdk-ram-3.0.0.jar
-rw-r--r-- 1 hadoop hadoop 13277 Jan 2 10:59 aliyun-java-sdk-sts-3.0.0.jar
-rw-r--r-- 1 hadoop hadoop 562719 Jan 2 10:59 aliyun-sdk-oss-3.4.1.jar
-rw-r--r-- 1 hadoop hadoop 71074 Jan 2 15:12 hadoop-aliyun-2.6.0-cdh5.14.4.jar
-rw-r--r-- 1 hadoop hadoop 736658 Jan 2 15:10 httpclient-4.5.2.jar
-rw-r--r-- 1 hadoop hadoop 326724 Jan 2 15:10 httpcore-4.4.4.jar
-rw-r--r-- 1 hadoop hadoop 153115 Jan 2 15:10 jdom-1.1.jar

同步阿里云的库到DSS:

cd /opt/aliyun-oss-jars/
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-spark-enginemanager/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-hive-entrance/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-spark-entrance/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-resourcemanager/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/eureka/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-jdbc-entrance/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/modulebak/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/module/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-publicservice/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-bml/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-python-enginemanager/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-python-entrance/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-gateway/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-ujes-hive-enginemanager/lib/%s\n", $2, $2)}' | sh
find . -name "*.jar" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/%s /opt/linkis/linkis-metadata/lib/%s\n", $2, $2)}' | sh
cd /opt/linkis
find . -name "lib" -print | awk -F'/' '{printf("ln -s /opt/aliyun-oss-jars/hadoop-aliyun-2.6.0-cdh5.14.4.jar /opt/linkis/%s/lib/hadoop-aliyun.jar\n", $2)}' 

Scriptis的右侧刷不出来数据,一直在刷新中

步骤1: 修改文件

vim /home/hadoop//Linkis20191218/metadata/target/classes/com/webank/wedatasphere/linkis/metadata/hive/dao/impl/HiveMetaDao.xml
<select id="getDbsByUser" resultType="java.lang.String" parameterType="java.lang.String">
        <!--select NAME from(
        select t2.NAME  as NAME
        from DB_PRIVS t1, DBS t2
        where (lcase(t1.PRINCIPAL_NAME) = #{userName,jdbcType=VARCHAR}
        OR t1.PRINCIPAL_NAME IN (SELECT ROLE FROM(SELECT r.ROLE_NAME AS ROLE, u.PRINCIPAL_NAME AS USER FROM ROLES r LEFT JOIN (SELECT * FROM ROLE_MAP WHERE PRINCIPAL_TYPE = 'USER') u ON r.ROLE_ID = u.ROLE_ID)AS T where T.USER = #{userName,jdbcType=VARCHAR}))
        and lcase(t1.DB_PRIV) in ('select','all') and t1.DB_ID =t2.DB_ID
        union all
        select t3.NAME as NAME
        from TBL_PRIVS t1, TBLS t2 , DBS t3
        where t1.TBL_ID=t2.TBL_ID and lcase(t1.TBL_PRIV) in ('select','all') and (
        lcase(t1.PRINCIPAL_NAME) = #{userName,jdbcType=VARCHAR} or lcase(t1.PRINCIPAL_NAME) in (SELECT ROLE FROM(SELECT r.ROLE_NAME AS ROLE, u.PRINCIPAL_NAME AS USER FROM ROLES r LEFT JOIN (SELECT * FROM ROLE_MAP WHERE PRINCIPAL_TYPE = 'USER') u ON r.ROLE_ID = u.ROLE_ID)AS T where T.USER = #{userName,jdbcType=VARCHAR}))
        and t2.DB_ID=t3.DB_ID) a
        GROUP BY NAME
        order by NAME-->
        select name from DBS
    </select>

    <select id="getTablesByDbNameAndUser" resultType="map"  parameterType="map">
        <!--select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
        from DB_PRIVS t1,TBLS t2, DBS t3
        where  t1.DB_ID =t3.DB_ID
        and t2.DB_ID=t3.DB_ID
        and lcase(t1.DB_PRIV) in ('select','all')
        and lcase(t1.PRINCIPAL_NAME) = #{userName,jdbcType=VARCHAR}
        and t3.NAME = #{dbName,jdbcType=VARCHAR}
        union
        select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
        from DB_PRIVS t1,TBLS t2, DBS t3
        where  t1.DB_ID =t3.DB_ID
        and t2.DB_ID=t3.DB_ID
        and lcase(t1.DB_PRIV) in ('select','all')
        and lcase(t1.PRINCIPAL_NAME) in (select ROLE_NAME from ROLES where ROLE_ID in (select ROLE_ID from ROLE_MAP where PRINCIPAL_NAME = #{userName,jdbcType=VARCHAR}))
        and t3.NAME = #{dbName,jdbcType=VARCHAR}
        union
        select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
        from TBL_PRIVS t1, TBLS t2 , DBS t3
        where t1.TBL_ID=t2.TBL_ID
        and t2.DB_ID=t3.DB_ID
        and lcase(t1.TBL_PRIV) in ('select','all')
        and t1.PRINCIPAL_NAME = #{userName,jdbcType=VARCHAR}
        and t3.NAME = #{dbName,jdbcType=VARCHAR}
        union
        select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
        from TBL_PRIVS t1, TBLS t2 , DBS t3
        where t1.TBL_ID=t2.TBL_ID
        and t2.DB_ID=t3.DB_ID
        and lcase(t1.TBL_PRIV) in ('select','all')
        and t1.PRINCIPAL_NAME in (select ROLE_NAME from ROLES where ROLE_ID in (select ROLE_ID from ROLE_MAP where PRINCIPAL_NAME = #{userName,jdbcType=VARCHAR}))
        and t3.NAME = #{dbName,jdbcType=VARCHAR}
        order by NAME;-->
        select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
        from TBLS t2 , DBS t3
        where 
         t2.DB_ID=t3.DB_ID
        and t3.NAME = #{dbName,jdbcType=VARCHAR}
    </select>

步骤2: 进入Linkis-20191218/metadata 重新编译

root@cdh04:/home/hadoop/Linkis-20191218/metadata# mvn clean 
root@cdh04:/home/hadoop/Linkis-20191218/metadata# mvn install

步骤3: 确认相应的包已经更新

cp linkis-metadata-0.9.2.jar /opt/linkis/linkis-metadata/lib/linkis-metadata-0.9.2.jar 

步骤4: 重启所有linkis-metadata微服务

错误码: errCode: 10905

错误日志

[2019-12-24 22:47:39.120 [INFO ] [qtp296594285-26] c.w.w.d.s.l.LockAspect (63) [around] - 调用方法:addRootFlow
2019-12-24 22:47:39.120 [INFO ] [qtp296594285-26] c.w.w.d.s.l.LockAspect (72) [around] - projectVersionID为:1
2019-12-24 22:47:39.135 [INFO ] [qtp296594285-26] c.w.w.d.s.l.LockAspect (80) [around] - 执行过程出现异常 
com.webank.wedatasphere.linkis.httpclient.exception.HttpClientResultException: errCode: 10905 ,
		desc: URL http://127.0.0.1:9001/api/rest_j/v1/bml/upload request failed! ResponseBody is 
		{"timestamp":"2019-12-24T14:47:39.132+0000","status":404,"error":"Not Found","message":"/api/rest_j/v1/bml/upload","path":"/api/rest_j/v1/bml/upload"}. ,
		ip: cdh04 ,port: 9004 ,serviceKind: dss-server
		]

解决方案:
到部署目录linkis删除下面所有微服务的包

find . -name "jasper-*" -print | awk  '{printf("mv %s %s_del\n", $1,$1)}' |sh

软件版本:

  • Hadoop 2.6.0
  • MySQL 5.7.26
  • Hive 1.1.0
  • HBase 1.2.0
  • Spark2 2.2.0

 

【有奖征文】DSS实践

技术选型

公司所处的是金融服务类的行业,需要给不同银行/公司实施一整套的项目。所以筛选了一圈发现DSS是跟公司场景最吻合的。

改造过程

(1)单点登录
因为公司的服务方向是基于业务会有很多系统一整套的输出,所以第一个改造过程就是让DSS接进公司的单点登录系统。
第一步就是把dss整套先都搭建起来,过程很酸爽 :-)但排除各种问题完整搭建起来后还是非常有成就感的。
在整合过程中顺便提了个issues:apache/linkis#489

(2)脚本加密
因为公司规划里不想让客户知道具体执行了什么脚本,所以对所有的存储文件、脚本都做了加密,只有在界面跟底层引擎执行时才能看到具体真正的执行逻辑。

(3)适配Oracle、pg
默认DSS jdbc和visualis对 Oracle/pg的适配有些bug,因为后续的输出公司很多都会有极大可能用到Oracle,所以针对这个改动了一些

(4)界面适配
根据公司系统的风格,重新设计了首页,并且给workspace增加了删除的功能

(5)引入Schedulis
默认的DSS全家桶安装包里只有standalone的azkaban,不符合输出的要求,而且发现Schedulis也开源了,Schedulis还是添加了很多实用功能的,比如web节点的高可用,支持用户管理/exec的用户分配管理等等,所以用Schedulis替换了原先 standalone 的 azkaban。过程发现 Schedulis 对 azkaban 的改动还是蛮大的(webank牛逼)

(6)优化一键部署脚本
原先的一键部署脚本因为这里引入了Schedulis,并且去掉了qualitis,所以有稍稍优化一下,也优化了一些再分布式部署时的一些小问题

(7)所有数据文件接入HDFS
因为生产高可用的场景下DSS的脚本文件放在本地物理机上的话可能会造成脚本文件的丢失,所以修改点前端代码将脚本文件允许保存到hdfs
...

后续改造方案

(1)优化一键部署脚本,使得能够一键部署分布式、高可用的DSS
(2)尝试改造 linkis-gateway 成无状态的服务
(2)合并服务,现在DSS整套搭建起来服务还是很多的,维护起来不是很方便(社区1.0版本好像有这个方案)
(3)适配华为大数据平台:在实施过程中因为很多公司都是基于华为大数据平台的,目前在适配过程中发现了很多问题导致用不了hive、spark engine,被迫转向用jdbc entrance去连Oracle 呜呜呜
(4)整合公司的API服务网关
(5)整合Exchangis数据交换平台

2023年第一期有奖悬赏需求说明

一、 需求描述

  1. WeDataSphere各开源组件版本要求如下:
组件名 Apache Linkis DataSphere Studio Schedulis Qualitis Exchangis Visualis Streamis MYSQL JDK
版本号 1.3.1 1.1.1 0.7.1 0.9.2 1.0.0 1.0.0 0.2.0 5.1.49 1.8
  1. 底层计算存储引擎的版本要求如下,每个版本的适配,只接受一个Docker镜像成品:

社区常用版本如下,每人可认领一个版本:

a). CDH5.12.1 版本

组件名 Hadoop Hive Spark Flink Sqoop Trino
版本号 2.6.0-cdh5.12.1 1.1.0-cdh5.12.1 2.4.3 1.12.4 1.4.6 371

b). CDH6.3.2 版本

组件名 Hadoop Hive Spark Flink Sqoop Trino
版本号 3.0.0-cdh6.3.2 2.1.1-cdh6.3.2 3.0.0 1.12.4 1.4.6 371
  1. 编译规范

请参照 Linkis 版本适配 对 Apache Linkis 进行编译,其他组件无需编译,可直接使用官方安装包。

  1. 组件部署规范

● 安装部署目录规范

├── wedatasphere --根目录

│ ├── sbin --wedatasphere全家桶一键启动和一键停止的脚本目录

│ │ ├── start-all.sh --wedatasphere全家桶一键启动脚本

│ │ ├── stop-all.sh --wedatasphere全家桶一键停止脚本

│ │ ├── wedatasphere-env.sh --wedatasphere全家桶环境变量配置脚本

│ ├── install --wedatasphere各组件安装包的存放目录

│ │ ├── --LinkisInstall Linkis安装包根目录

│ │ ├── ……

│ ├── config --wedatasphere各组件配置文件的存放目录

│ │ ├── linkis-config --Linkis 配置文件根目录

│ │ ├── ……

│ ├── logs --wedatasphere各组件日志文件的存放目录

│ │ ├── linkis -- Linkis日志文件根目录

│ │ ├── ……

● 环境变量规范

wedatasphere-env.sh 可支持用户配置 Hadoop、Hive、Spark、Flink、Sqoop、Trino等引擎的环境变量,具体如下:

HADOOP_CONF_DIR=
HADOOP_HOME=
YARN_RESTFUL_URL=
HIVE_CONF_DIR=
HIVE_HOME=
HIVE_META_URL=
HIVE_META_USER=
HIVE_META_PASSWORD=
SPARK_CONF_DIR=
SPARK_HOME=
FLINK_HOME=
FLINK_CONF_DIR=
FLINK_LIB_DIR=
SQOOP_HOME=
SQOOP_CONF_DIR=
HCAT_HOME=

● 文档规范

文档尽量以图做说明,少用大段文字。

文档种类要求如下:

文档名称 安装部署文档 Demo使用文档 常见问题文档 升级指南 开发文档
文档内容 用于指导用户安装部署WeDataSphere全家桶 指导用户如何使用全家桶已有的Demo 安装过程中,可能出现的常见问题 指导用户如何只升级或替换WDS的某个组件 1. 给出目录层级结构解释
2. 给出启动逻辑
3. 如何新增一个全新的组件

● Demo规范

i) 官方将提供可导入的 DSS Demo项目,请在打镜像包之前,将Demo导入并保证Demo可正常执行。

ii) 官方将提供Scriptis Demo脚本,请在打镜像包之前,将Demo 脚本导入并保证可正常执行。

iii) 官方将提供 Streamis Demo 流式应用,请在打镜像包之前,将Demo 流式应用导入并保证可正常执行。

● Docker容器制作规范

i) 请尽量保证Docker容器包的总大小不超过12G;

ii) 请将 /wedatasphere 整个全家桶目录制作成一个容器镜像;

iii) 各组件配置的文件路径请使用相对路径,确保各组件可正常使用;

iv) 由于Linkis依赖底层Hadoop、Hive、Spark等组件,请注意提供Hadoop、Hive、Spark等底层计算存储引擎的配置文件的挂载规范和环境变量配置规范。

二、验收标准

  1. 文档齐全且质量高。

  2. 可按照安装部署文档,在30分钟内完成镜像的部署、启动和Demo的基本使用。

三、整体流程

1、和社区工作人员确认任务、接受任务;

2、社区工作人员在WeDataSphere项目,同步创建新的branch,并创建代码提交目录;

3、社区伙伴完成镜像的制作和验证后,提 PR,上传相关源代码和文档;

4、社区工作人员验证镜像包,验证无误后,合并 PR;

5、正式发布镜像包,发放奖励,进行社区宣传等。

四、时间规划

2月15日-2月22日 认领及需求沟通阶段
2月22日-3月08日 开发阶段
3月08日-3月22日 测试验收阶段

【有奖征文】DSS的CICD实践

DSS(DataSphereStudio)的实现强依赖于Linkis计算中间件,dss包含6个,而底层linkis需要部署18个服务,所以一般基于dss二次开发,关键就是对linkis的hadoop集群做适配,以及超多的微服务导致部署的问题(工作量大,服务间存在依赖,容易出错)。

本文主要的关注点是如何将dss应用于生产环境并采用gerrit + jenkins + ansible + docker实施cicd,实现对linkis和dss的自动化部署,封装每个微服务在不同运行环境的配置和启动脚本。

关于DSS

DSS(DataSphereStudio)是一个一站式数据应用开发管理门户,基于插拔式的集成框架设计,基于计算中间件Linkis实现。

Linkis部署结构

linkis总共18个微服务

Linkis服务列表

  • eureka:注册中心
  • linkis-gateway:网关
  • linkis-resourcemanager:资源管理服务
  • linkis-dsm-server:数据源服务
  • linkis-mdm-server:元数据管理服务
  • linkis-metadata:元数据服务
  • linkis-bml:物料库
  • linkis-cs-server:统一上下文服务
  • linkis-publicservice:公共服务(variable,database,udf,workspace等)

ujes 统一作业执行引擎

  • linkis-ujes-hive-enginemanager
  • linkis-ujes-hive-entrance
  • linkis-ujes-jdbc-entrance
  • linkis-ujes-python-enginemanager
  • linkis-ujes-python-entrance
  • linkis-ujes-shell-enginemanager
  • linkis-ujes-shell-entrance
  • linkis-ujes-spark-enginemanager
  • linkis-ujes-spark-entrance

Linkis部署包组成

每个服务的目录结构都一致,ujes部分会多一些引擎相关的配置:

  • bin:服务启动/停止脚本
    • pid文件:linkis.pid
    • 用户切换脚本:rootScript.sh
    • 启动服务:start-${SERVICE_NAME}.sh
    • 停止服务:stop-${SERVICE_NAME}.sh
  • config:服务配置文件
    • log4j2.xml:log4j日志配置
    • log4j.properties:日志变量
    • application.yml:spring boot配置
    • linkis.properties:linkis服务配置
    • linkis-engine.properties:linkis ujes引擎配置
    • log4j2-engine.xml:ujes引擎log4j日志配置
  • lib:依赖jar包
  • logs:日志文件
    • linkis.log:log4j日志,按天/大小回滚
    • linkis.out:jvm启动日志,每次启动覆盖
    • linkis-gc.log:jvm gc日志,每次启动覆盖

DSS部署结构

  • dss-web:前端服务(可包含visualis-web)
  • dss-server:dss后端服务
  • dss-flow-execution-entrance:工作流执行入口
  • linkis-appjoint-entrance:linkis任务提交入口
  • dss-init-db:仅用于第一次初始化数据库

由于linkis和dss都是微众开源的,dss部署包的目录结构和linkis类似;

DSS部署资源规划

安装linkis+dss服务测试环境,采用4核8G*6台虚机:

  • Server1:linkis-gateway、linkis-publicservice、linkis-cs-server、linkis-dsm-server、linkis-bml、linkis-metadata、linkis-mdm-server
  • Server2:enginemanager(spark、python)、entrance(spark、python)
  • Server3:enginemanager(hive、shell)、entrance(hive、shell)、jdbc-entrance
  • Server4:eureke、linkis-resourcemanager
  • Server5:dss-server、linkis-appjoint-entrance、dss-flow-execution-entrance
  • Server6:qualitis-server、azkaban、visualis-server

测试采用简化部署结构,生产eureke,linkis-resourcemanager需要HA部署;
每个服务的堆大小默认设置为1G;
服务间存在依赖关系,需按顺序启动:比如需先启动eureka,gateway,resoucemanager等基础服务,再启动其他应用层服务;

单机资源够的情况下,测试时可以将ujes都部署在一台服务器;
DataSphereStudio部署图

实际生产环境,根据服务使用人数,具体可参考官方的文档Linkis生产部署参考指南做容量规划。

DSS的CICD流程

主体CICD流程:代码提交到gerrit,review成功后,自动mvn打包,并通过ansible在测试环境发布重启docker容器,同时生成生产环境的部署包;

Linkis自定义编译

自定义hadoop版本,修改linkis根目录和linkis-ujes-spark-engine项目的pom.xml文件
比如修改hadoop到2.6

<hadoop.version>2.6.5</hadoop.version>    
<hive.version>1.1.0</hive.version>    
<spark.version>2.3.0</spark.version>

编译问题

  • shell-enginemanager存在jackson包冲突会导致启动失败,保留2.10.0,其他版本exclude即可;
  • 遇到Assembly is incorrectly configured问题,将useStrictFiltering属性改成false即可;
  • eureke需设置instance-idprefer-ip-addressip-address,不然显示的是docker内部ip,且服务间不能正常通信(使用的默认是docker内部ip);

DSS的部署包准备

  • mvn -N install
  • mvn -Pspark2.3 clean install
  • 将assembly/target/wedatasphere-linkis-{version}-dist.tar.gz解压后挂载到docker中

上述流程可通过

  • JJB(jenkins job builder)实现devops as code,以yaml编写ci流程,ci流程更新后自动触发jenkins任务更新;
  • Jenkins中配置在gerrit trigger,配置不同的hook,让代码更新后自动触发对应的job构建;

DSS的多环境自动部署

在官方的config目录下添加dev、test、prod等配置,按不同部署环境的环境变量配置config.sh和db.sh,并通过docker挂载到容器内;

linkis和dss的目录结构比较规范,做容器化时,只需要参考install.sh中的脚本,拆分成多个entrypoint即可。

注意

  • 官方的脚本针对的是一键部署,ansible集成时,所有的remote操作都可以简化为local操作

DSS的运行日志

  • bin/start-{SERVICE_NAME}.sh脚本然后将SERVER_LOG_PATH改为/var/log/{SERVICE_NAME}SERVER_LOG_PATH并挂载到docker中,以便在容器重启后能够保持日志;
  • 将官方log4j.properties中的logs/linkis.log改为${env:SERVER_LOG_PATH}/linkis.log;
  • gc和jvm日志也可参考log4j日志路径修改;

DSS的Docker镜像

  • CDH环境配置:参考CDH Agent机器配置即可,配置好后需设置HADOOP_HOME,SPARK_HOME,HIVE_HOME等环境变量
  • 根据不同的运行环境,挂载不同的hadoop/yarn的核心site.xml文件;
  • 确保terminal能正常使用hdfs,spark-sql,hive,kinit等服务;

DSS的Docker服务

实现startup.sh ${SERVICE_NAME},通过SERVICE_NAME参数实现启动指定的微服务;
每个微服务的entrypoint脚本主要实现几个步骤:

  • 复制公共模块包;
  • 复制服务压缩包;
  • 删除当前部署目录;
  • 解压服务压缩包;
  • 读取config中的变量,用sed替换spring和log4j等配置文件;
  • 调用服务压缩包bin/start-{SERVICE_NAME}.sh启动服务;
  • 检查服务是否启动成功并打印启动日志;

docker容器的entrypoint示例:startup.sh

# load config and init
RUN_ENV=${DSS_RUN_ENV:=dev}

# eg. /opt/dss/dss-dist
shellDir=${DSS_INSTALL_HOME}/bin
echo "shellDir:${shellDir}"
# tar package path
workDir=$(
  cd ${shellDir}/..
  pwd
)
export workDir=$workDir
echo 'workDir' ${workDir}

# eg. /opt/dss/dss-run
DSS_WORK_HOME=${DSS_WORK_HOME:=${workDir}}
export DSS_WORK_HOME=$DSS_WORK_HOME
echo 'DSS_WORK_HOME' ${DSS_WORK_HOME}

# init directories and log dir
export LOG_DIR=/var/log/$1
mkdir -p ${LOG_DIR}
touch $LOG_DIR/linkis.out
echo "LOGDIR:${LOG_DIR}"

source ${workDir}/bin/common.sh

source entrypoint/$1.sh

echo "tail begin"
exec bash -c "tail -n 1 -f $LOG_DIR/linkis.out"

linkis-gateway的entrypoint示例:linkis-gateway.sh

source ${workDir}/bin/entrypoint/functions.sh
EUREKA_URL=http://$EUREKA_INSTALL_IP:$EUREKA_PORT/eureka/

##GateWay Install
PACKAGE_DIR=springcloud/gateway
APP_PREFIX="linkis-"
SERVER_NAME="gateway"
SERVER_PATH=${APP_PREFIX}${SERVER_NAME}
SERVER_IP=$GATEWAY_INSTALL_IP
SERVER_PORT=$GATEWAY_PORT
SERVER_HOME=$LINKIS_WORK_HOME

###install dir
installPackage

###update linkis.properties
echo "$SERVER_PATH-update linkis conf"
SERVER_CONF_PATH=$SERVER_HOME/$SERVER_PATH/conf/linkis.properties
executeCMD $SERVER_IP "sed -i \"s#wds.linkis.ldap.proxy.url.*#wds.linkis.ldap.proxy.url=$LDAP_URL#g\" $SERVER_CONF_PATH"
executeCMD $SERVER_IP "sed -i \"s#wds.linkis.ldap.proxy.baseDN.*#wds.linkis.ldap.proxy.baseDN=$LDAP_BASEDN#g\" $SERVER_CONF_PATH"
executeCMD $SERVER_IP "sed -i \"s#wds.linkis.gateway.admin.user.*#wds.linkis.gateway.admin.user=$deployUser#g\" $SERVER_CONF_PATH"
isSuccess "subsitution linkis.properties of $SERVER_PATH"
echo "<----------------$SERVER_PATH:end------------------->"
##GateWay Install end

# start and check
startApp
sleep 10
checkServer

DSS的CICD建议

原则是尽量统一行为规范,便于实施约定由于配置,实现运维自动化。

  • install脚本可以按微服务隔离成多个sh脚本,隔离关注点,方便容器化部署,方便社区参与&维护;
  • 项目命名规则不统一:有的驼峰有的全小写(contextservice,resourceManager),改版时可以统一风格;
  • eureka,dss-web和其他服务的install脚本不太一致,比如前缀,命名大小写把所有远程安装的脚本都删除,修改为本地操作即可;
  • dss-web可以添加frontend-maven-plugin插件,不依赖node环境完成前端自动化打包;
  • 可以加入flyway等数据库ddl的版本控制,不然后面数据结构的迭代升级会比较痛苦;

DSS相关术语

  • wds:WebDataSphere,套件名称,包含dss
  • dss:DataSphereStudio,数据平台
  • ujes,Unified Job Execution Services,通用作业执行服务
  • bml:Material Library ,物料库
  • dwc:DataWorkerCloud

最后说一句,微服务如果不用docker/k8s真的是太遗憾了,谁用谁说香啊,官方赶紧出一个吧!

【有奖征文】DSS + Linkis 在aws emr6.1 场景下的部署

痛苦中找出路

公司目前情况:
依托aws托管的大数据集群,基本上用的也都是aws提供的服务。aws起集群真实是方便,也带有一些工具,hue,jupyter notebook等。但是带来的问题也是明显的,比如服务一个个的起来,管理,安全,审核等各个带来等工作量压力就非常大了,业务需求为主,先用hue跑,hue满足不了装一个zepplin跑spark代码,实在不行等开console权限等等,这样下来一直忙于本命,却很容易被别人抓住问题不断等喷,所以选择一个合适的开源平台持续建设才能真正的让大数据工程方面得到长足的发展,DSS+Linkis就是这样一个相对较为完美的选择。

出路不代表平坦路

要想安装上Dss + Linkis 还真不是一件容易的事情,首先aws的各种包都是专有版本,替换包是肯定要的,各种一阵替换(这里就先略去啦,较为繁琐),还有就是发现maven公开地址上根本就没有aws的仓库,有个同事说去机器上下载,我去。。。这变个版本不累趴下啊,还好找到了解决方案:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-artifact-repository.html,
对应的maven文件中添加,如下:

<repository>
            <id>emr</id>
            <name>EMR emr-6.1.0 Releases Repository</name>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <url>https://s3.ap-northeast-1.amazonaws.com/ap-northeast-1-emr-artifacts/emr-6.1.0/repos/maven/</url>
 </repository>

实际依赖的一些jar的版本,比如 spark,hive ,hadoop等最好都是和线上版本保持一直,并且要在配置文件中根据报错情况加入执行engine需要等lib包路径比如 native包,以及emr特有等一些包等路径等。

升级打怪还是要写代码

我们使用的emr使用的元数据是配置的Glue中来存储的元数据,没有mysql的实现,这块也要兼容起来,首先是要用aws-sdk-glue的包引进来,兼容实现了 hiveDao类似的功能,实现接口和填充数据结构,另外就是在hive引擎里也要对这里做一些修改,就初步解决问题了,如果还要接入Ranger或是hiveserver2里去拉取的话,实现的话还是较为麻烦,暂时没有实现按用户过滤不同数据的实现。

解决问题渐入佳境

但是在spark engine的启动上还是遇到了坑,spark engine在执行的时候一直错误,无法成功,日志也看不出什么问题,于是请请教了微众开源团队的 peacewang@WDS 同学,在peacewang@WDS热心的帮助下解决了问题,同时建议我提交了一个bug issue:https://github.com/WeBankFinTech/Linkis/issues/506, 解决了spark问题后就向前了一大步,可以使用spark3.0的特性了,spark-sql性能肯定有很大提升。

但是在测试pyspark的时候却发现一直遇到py4j gateway的安全性问题,创建不成功,在 peacewang@WDS的提醒下发现PYSPARK_ALLOW_INSECURE_GATEWAY=1并不好用了,绕不开安全策略了。
网上找到了一个办法:

private lazy val py4jToken: String = RandomStringUtils.randomAlphanumeric(256)

  private lazy val gwBuilder: GatewayServerBuilder = {
    val builder = new GatewayServerBuilder()
      .javaPort(0)
      .callbackClient(0, InetAddress.getByName(GatewayServer.DEFAULT_ADDRESS))
      .connectTimeout(GatewayServer.DEFAULT_CONNECT_TIMEOUT)
      .readTimeout(GatewayServer.DEFAULT_READ_TIMEOUT)
      .customCommands(null)

    try builder.authToken(py4jToken) catch {
      case err: Throwable => builder
    }
  }

然后在实例化的时候这样:

gatewayServer = gwBuilder.entryPoint(this).javaPort(port).build()

把对应的token传入到python文件中

cmd.addArgument(py4jToken, false)

在mix_pyspark.py文件中进行对应的处理的时候也加入token:

gateway = JavaGateway(client, auto_field = True, auto_convert = True,
                    gateway_parameters=GatewayParameters(port = int(sys.argv[1]), auto_convert = True, auth_token = sys.argv[3]))

这样就解决了pyspark执行的问题,具体pr:apache/linkis#512

Cannot join WeChat group

I tried to scan the QR code but I cannot join the WeChat group because the no. of users > 100, and I can only join by invitations.

Missing Components

I have tried to deploy WeDataShpere on my local server, but it seems many components, including 工作流,看板系统,EasyIDE. I have searched the entire project and failed to find such components. Check out this screenshot.

image

【有奖征文】DSS结合数据治理应用与实践

DSS应用场景

​ 大数据对于公司的贷前、贷中、模型、风控业务以及客户生命周期管理等各个方面都起到了至关重要的作用,在这个瞬息万变的时代,数据具有时效性,快速高效又安全统一地利用数据是关键。

​ 在未使用DSS前,对业务方来说,作业状态进度并不透明,各业务科室的分析作业没有统一的管理,作业调度发布流程繁琐,发布随意,报表开发复杂,缺乏一个高效的、全流程打通的数据分析平台;对于数据团队来说,管理调度任务非常繁琐,需要耗费大量人力协助业务方开发。

​ DSS是一个可以解决上述问题的统一数据分析平台,DSS为各个业务科室提供了自助代码开发,任务测试,查看任务状态,取数、创建工作流程、跑批调度以及数据可视化等一站式服务,为业务开发人员免去了很多不必要的沟通联调时间,也让数据分析任务的发布更加流程化,调度发布操作也更加简便和人性化。DSS内置了数据交换,数据开发,数据数据质量,数据可视化和数据发送等功能,业务开发人员通过DSS完成几乎所有的任务。不仅如此,DSS上层支持多种计算引擎,方便业务方开发,也可以很方便地定制和引入新的计算引擎,同时在下层又能兼容多种数据源,对现有集群环境很友好。

DSS解决的问题

​ 在萨摩耶数科, DSS作为统一数据分析平台被推广到各个业务科室使用,一些旧任务也开始逐步迁移DSS平台,通过DSS平台来管理。DSS主要解决了以下问题:

​ 1.数据开发分析效率低,需求上线周期长

​ 2.人力投入大

​ 3.数据发现效率低,存储成本高

​ 4.无法提供统一的数据服务

​ 5.任务缺乏统一管理

​ 6.可视化报表开发效率低

DSS最佳实践

​ Linkis、Visualis、Schedulis和工作流在整个大数据平台中的使用频率很高,业务方直接使用这些组件来完成开发、测试、调度和上线工作。
​ 我们在开源DSS基础上修复了一些影响业务使用的BUG,同时也做了一些定制化开发。我们定制开发了数据治理模块并集成到DSS平台,通过数据治理模块,各个业务科室可以清晰地看到当前自己科室开发库的空间配额、文件数配额:
1
​ 可以看到当前科室开发库中各个表的元数据信息,包括表大小、文件数、目录数、平均文件大小、是否分区、创建时间、更新时间和最近一次访问时间:
2
​ 点击表或搜索一张表后,可以显示详细血缘关系、表结构、表类型、分区等信息,如下:
3
​ 通过数据治理模块,业务方可以很方便地了解表的相关信息以及业务含义,自动生成数据地图、数据字典,降低数仓数据的学习成本。后续也会继续集成列级血缘,减少人工查看字段计算口径的时间,提高业务理解和业务开发效率以及实现基于血缘的自动化跑批失败重试机制。

​ 目前我们使用的统一数据分析平台版本是0.9.3,仅仅支持离线跑批任务,我们又在此基础上集成开发了实时计算平台,业务方可以以SQL的形式发布实时任务,并自动生成Flink作业提交执行。

​ 我们也计划根据公司业务迭代报表展示和数据可视化模块,实现按业务分类、按维度选取的拖拽式报表生成功能。

带来的业务价值

  1. 提高了数据分析效率,分析任务上线速度加快
  2. 节约了人力成本
  3. 为公司各业务科室使用数据提供便利的一站式服务
  4. 各业务科室能够自行管理各自的业务代码、随时查看任务状态
  5. 业务开发无需关注底层具体细节
  6. 报表开发效率提高

期待的功能与改进

dss+linkis是国内优秀的开源项目,微众的小伙伴的努力有目共睹,我们对linkis+dss的实践场景和使用前景非常看好,但也有一些对它的期待:

  1. 期待添加实时相关功能,集成一个基于Flink的,集SQL编辑、测试、发布、上线于一体的实时计算平台

  2. 类似神策的自助分析功能

  3. 期望相关BUG修复:JDBCEntrance假死不提交任务不更新任务状态、空指针问题;Azkaban的WebServer和Executor部署存在高耦合;CodeParser解析代码报错;调度任务失败时无关键错误信息给定位问题带来不便,建议提交任务到Yarn时可以在日志中看到相应的ApplicationID或JobID;修改EngineManager并行度MaxRunningJob不生效;与实际执行任务相关的进程间耦合性较高,高可用性较低,例如GATEWAY,PublicService,Azkaban等,任何服务意外退出都会导致正在运行的任务全部挂掉,希望能对重要组件做高可用和智能重试机制;Azkaban上Kill任务后状态一直为Killing,后台数据库不更新状态;Azkaban没有做权限隔离,用户可以随意查看和操作他人的调度;Gateway中netty发送Websocket大小限制65536字节,用户脚本内容过多时会导致无法加载内容(Issue: #476);任务管理器和引擎管理器显示异常,有时需要强制刷新才能正常显示;Visualis可视化报表对维度排序会报错;

  4. 开通登陆账号步骤繁琐,建议增加注册或自动化建账号功能

linkis 0.9.3
hadoop 2.6.0
hive 1.2.1

【有奖征文】数据开发平台集成linkis分享

应用场景:
公司的数据开发团队目前使用的是hue和notepad记事本开发脚本,开发较落后,需要在不同的环境hue界面测试开发脚本,没有统一的界面进行开发维护,而且出错率高,没有自动提示和管理。目前公司数据的处理方式是先用记事本把脚本写好,然后都不同的hue环境测试开发脚本运行结果是否一致。然后在上传脚本到调度平台的脚本目录,并且配置好调度平台调度时间,依赖以及重试次数等配置参数,测试通过后然后在上线的生产环境。此过程较为繁琐,而且脚本不易控制,所以我们使用开发平台与linkis引擎集成,通过用户绑定ldap用户执行不同环境下的脚本查看结果,存储开发脚本规范脚本编写,极大的提高了效率和使用方式。
解决问题:
使用了我们的用户体系与linkis组件,以页面化的形式开发脚本,运行、测试不同执行环境,打包,发布,上线。方便快捷,能够使用多种类型脚本直接操作hive数据,不需要数据导出到本地后再处理,对开发脚本节省开发时间,相比较于以前的方式的管理HDFS中的文件。目前阶段处于分析人员使用linkis引擎开发数据处理脚本,以及使用过程中的问题修复。

使用情况:
目前阶段处于linkis适配公司环境,以及修复使用问题,还未新增功能或者新引擎支持,后续如有会分享出来。
公司的大数据相关环境有专门的团队负责,我们在安装使用的过程中进行了一系列的适配:

大数据环境使用的是CDH 5.14.2,而源码是社区版本,所以根据具体的版本我们进行了重新编译。
HDFS权限被ACL接管,不与Linux系统权限同步,所以直接用hql脚本查询数据时,遇到了HDFS目录无权访问的情况,经过查看源码我们使用ldap用户绑定我们的用户项目组执行脚本,在创建ldap用户的时候我们会调用一个自己预先编好的脚本执行程序自动创建hdfs和linux用户目录。然而后期我们要对用户数据权限进行控制,所以集成了sentry,然而sentry在hive引擎中是无效的(hive引擎使用的hive beeline方式执行脚本),所以采用jdbc执行脚本,经过ldap用戶和密码验证后,才可访问hive数据。所以在使用中我们自己开发一套页面存储脚本(因为我们存放的脚本位置要根据我们的用户体系和环境体系来存储脚本,所以自己开发了一套页面)和执行脚本,主要使用jdbc和spark脚本来处理数据。
公司spark版本是2.2.0.cloudera2,所以把编译的cdh spark替换成集成的spark包,并把json4这类的冲突的包删掉,具体看linkis环境部署指南。
linkis引擎的ldap用户是不带用户组的,所以我们修改了LDAPUtils这个类的登录方法
image

linkis jdbc编译的包是不支持cdh5.14.2的所以把jdbc-entrance中的hive-jdbc包换成cdh5.14.2的并增加hive-service5.14.2的包,才能正常使用jdbc执行脚本。
期望功能:
日志打印:jdbc日志打印看不到异常信息,必须到jdbc-entrance的log日志目录去看,而且关键信息也只能看到jdbc Exception这种信息,无法分析问题。
结果集获取:无法获取超过5000条以外的数据,有时候获取结果集还会进度条卡死不动。
执行出错:同一用户使用jmeter并发执行50条sql,linkis报队列执行已满,不在响应,sdk不能迅速捕获异常信息。
目前版本的linkis脚本引擎启动时间过长,希望可以改善。
目前遇到了一些脚本持续停留在排队中的情况,重启引擎后才可以正常执行。

Originally posted by @JavaMrYang in #12 (comment)

【待认领】Meetup 03期活动视频剪辑

活动详情:[《Meetup预告:Linkis新版本介绍以及DSS的应用实践》] (https://mp.weixin.qq.com/s/jKPidHqP-NmCQuUPFmbniA)

【活动素材】
1、活动视频;
2、两位讲师的PPT;
链接: https://pan.baidu.com/s/1ocrDERTFNrY64Ckk4vUboQ?pwd=pixy 提取码: pixy

【需求描述】
1、把活动视频剪辑为2个视频,根据讲师主题来剪辑,分为:《Meetup 03期:Linkis-1.1.0新功能介绍》和《Meetup 03期:上海合合信息合数据工坊IDS》

【参考资料】
1、微众开源B站视频:https://space.bilibili.com/598542776?spm_id_from=333.337.search-card.all.click
2、微众银行WeDataSphere:https://www.bilibili.com/video/BV1XY4y157vW/?vd_source=9aa07872ebe6004451a9df19451c056a

完成时间:6月15日(内容提交给Andy验收)
上传B站时间:6月17日

如有任何疑问,请联系Andy

【有奖征文】Linkis是如何为Boss直聘数据中台赋能的?

一、Boss直聘数星平台介绍

结合业内的大数据业务经验,以及在公司内的实际业务环境,为了帮助数据和技术人员可以更加方便地利用大数据技术、帮助产品运营等业务人员可以轻松用数据创建业务价值,我们自研了一站式大数据应用开发和数据管理平台——数星平台。

 围绕着数据的血缘链路,从数据源头的生产管理,到数据同步采集、数据存储,实时离线计算引擎,
 再到数据的服务化,对接业务应用等,实现了数据开发、任务运维、自助分析、数据管理、项目管理、调度管理及多租户管理等功能。
 数星平台将数据开发、数据分析、数据ETL等数据科学工作通过工作流的方式有效地串联起来,
 提高了数据开发工程师和数据分析工程师的工作效率;作用于业务上,提高了数据价值变现的能力。

目前支持:
4套集群(实时Flink on Yarn集群 + Hadoop集群 + Spark集群 + Hbase集群)稳定使用;
数星平台—入口flume系统:每天采集400多亿条日志,接入实时的 ETL 存储解析流程,交付下游数据团队稳定使用;
数星平台—入口Dbus系统:接入关系型数据库表数量5w+ 个,每日完成过亿条数据记录的同步操作;
数星平台—调度系统:承接了约 22400+ 的调度任务,涵盖各业务线应用的核心数据计算环节;
DMP用户画像系统:接入了 5000+ 的用户标签,每日完成几万次人群画像分析服务;

目前服务近1000名产研、数据、算法的同学。

image

二、建设路径

现状问题: 如何将各BU大数据技术堆栈百花齐放转变为统一的平台
方案方案:四步走:工具、平台、数据、应用
第一步:全链路工具积累各场景用户量,datax、flume、kafka、canal、Ranger等等,提供不同场景的工具包指的获取用户流量。
第二步:工具互通形成平台,提高效率:kafka 对接 flume形成统一采集平台、canal 对接 kafka 统一binlog服务;
第三步:沉淀公共数据指标,提供数据地图、开发平台等产品,增加用户粘性。
第四步:场景化pipeline抽象应用工厂提高数据应用研发上线的效率,将能形成闭环的数据产品做出公共应用,为业务方做乘法,为自己做加法。

image

三、Linkis的价值

数据开发平台,尤其是hive、spark方面,linkis提供了整体技术解决方案,网关、入口、引擎、资源管理、公共服务等模块抽象了开发平台的业务,为后续新引擎的扩展提供了无限可能,目前日均SQL任务量4000左右。
Linkis使用心得:
(1)代码质量高:分包、分层合理。
(2)架构设计精良:子系统职责分明,通讯协议规范。参考我们参考Publicservice聚合不同领域API的模式实现了StreamManager模块。
(3)运行稳定:目前承接了很多对可用性要求高的,消息推送业务,整体运行稳定性比较好。
(4)可扩展性强,面向接口编程,方便后续扩展实现新特性。例如:storage子系统的com.webank.wedatasphere.linkis.storage.fs文件系统接口可以很方便实现其他存储系统。

四、扩展点:让Linkis支持数据角色,解决百名数据员工权限混乱问题。

image

问题:	     
   按照用户和资源进行绑定授权,会带来以下问题:
(1)hadoop用户量膨胀,/user/*目录管控治理繁杂
(2)员工离职入职权限初始化繁琐
(3)按照部门进行资源审计工作量大
(4)权限元数据信息膨胀
解决方案:
    将数据按照角色进行管理、授权
(1) 登录用户、引擎用户隔离,具体办法:loginUser(用户账号)、umUser(数据角色)
(2)集成数据权限系统,进行角色校验,gateway网关透传entrance的时候引擎的时候切角色账号
(3)Apache Ranger 解决hive、spark权限,带来新问题:hdfs rpc超时严重,重写Apache Ranger API以及hdfs插件去除无用的鉴权例如:执行权限。

五、扩展点:互相打通形成新功能,数据订阅服务

问题:定时日报、周报,简单的数据监控场景,形式SQL+定时+邮件
解决方案:Linkis Web进行定时任务录入,包括:SQL、触发时间、目标邮件地址等信息,调度系统扩展Linkis 执行器,集成SDK即可。

六、扩展点:Impala引擎

问题:JDBC方式天然缺陷,无法获取进度条,缺失失效重连机制等
解决方案:
		1、thrift协议  thrift -r --gen java TCLIService.thrift(hive-1-api目录)
		2、重连机制,实现连接池
		3、impala、hive 队列不匹配问题,需要按照一定逻辑做好队列映射。

七、扩展点:权限体系 + Mysql、ClickHouse引擎

问题:Mysql、ClickHouse、PostgreSQL、Kylin...... 账号权限控制
代理授权系统: 用户、组账号、库类型、库实例、库/表、可操作权限
数据源管理系统:库类型、库实例、操作类型、账号密码
使用:库实例、SQL、角色账号

八、扩展点:Flink实时计算平台

特性:Engine需要支持提交flink 不同版本的作业到不同的yarn集群, 例如:flink  1.9、1.11 ,因为不同版本的Flink submit 任务的API 和机制 不一样,所以Engine也要支持插件化,并且在SQL 的开发场景,会面临同一时间段的SQL提交到一个 Flink Session中。参考Linkis publicservice 聚合服务的机制,设计了streamManager,支持作业开发、物料版本控制、作业上线/回滚、日志搜索、监控报警等工作。
线上环境:DataStream、SQL 
开发环境:SQL
解决方案:
(1)StreamManager
(2)FlinkEntrance、FlinkEngineManager、FlinkEngine
(3)EngineGroup机制

九、扩展点:Linkis SDK 在Push推送以及特征计算同步一体化场景的运用

主要解决:上G结果集,数据传输的问题
支持3种方式下载结果集:
(1)默认直接下载
(2)经过publicservice 流下载:Linkis Dolphin ResultSetReader.getResultSetReader 反序列化读取即可。
(3)直接连接集群下载:获取result path,hadoop FileSystem read即可,需要注意的是去掉Linkis Dolphin协议,换成txt即可。

十、解决数据最后一公里赋能的问题:如何将Hive表变成Http、Dubbo接口?数据API服务方面的探讨

image

问题:数据分析师、数据工程师 数据上线的问题
两条pipeline,协议代码自动生成:支持dubbo、http
(1) online: hive、hbase、redis 、api、用户产品
(2) offline: hive、clickhouse、BI
特性:资源自助申请、K8S容器化部署管理、网关路由监控配置管理:Spring Cloud、Nacos、Sentinel
1、数据开发者(数仓|分析师):少做一些事
	减轻开发:减轻DM/DWS层到APP层的数据开发同步工作,对导入结果表类型无感知,在整个需求中可以提前退场解放
2、数据调用方(服务端RD):少做一些事,配接口、调接口方便
	分担职责:数据开发和运维的职责转移到上游,专注业务逻辑开发,对上游表类型无感知
	代码分离:数据代码、业务代码分离,方便业务交接
	方便调试:数据代码通过配置自动生成,可快速调试(测试调整、线上调整)
3、数据应用方(业务方PM):流程清楚,需求交付快
流程清晰:基于数据的业务应用开发流程清晰化(数据开发-数据接口配置-业务代码开发),便于跟踪进度(节点-相关人)
缩短需求周期:数据代码自动生成代替手动开发,缩短业务需求的整体开发周期
降低“改需求”的成本:若要基于OS接口的业务应用做优化迭代,无需阅读修改代码,界面改配置即可
4、系统维护方(管理员):可管控,可追踪

image

Linkis,目前是我们这边数据平台DAU排名top3的产品,感谢微众银行的同学!

【有奖征文】Linkis 新引擎实现分享

Linkis 新引擎实现分享

在社区大佬的帮助下,我们完成了 0.11 版本的开发,实现了 ElasticSearch 和 Presto 引擎。具体的开发文档可以参考: Linkis引擎开发文档

执行引擎架构的选择

目前 Linkis 的架构可以分为两种,一种是 Entrance-EngineManger-Engine 的模式,一种是 Entrance 模式。统一执行服务的架构可以参考官方文档: Linkis-UJES设计文档

Entrance 服务作为执行的入口,主要负责任务的持久化工作,日志的输出,进行脚本的校验和变量替换,并与 Engine、EngineManager 服务交互,向可用的 Engine 发送执行任务的请求,或者向 EngineManager 发送启动 Engine 的请求。

EngineManager 服务主要负责 Engine 的启动,进行 Engine 请求资源的请求与释放,并持续监控 Engine 的状态。

Engine 服务负责任务的具体执行,包括了任务执行的一些初始化操作、任务脚本的切分、任务的执行、任务的进度监控和结果集的保存等工作。

Spark、Hive 引擎是 Entrance-EngineManger-Engine 模式实现,在这个模式中 Engine 作为 Spark 、Hive 任务的 Driver 端,向外暴露接口可持续的接受 Entrance 发来的请求,完成任务的执行。这个模式中不仅实现了多租户的任务隔离,还提供了单用户的引擎复用,尽量减少 Engine 的启动,大大提高了执行的效率。

上面的各个服务可以看到每个服务的职责非常的明确,不过多个服务也让整个的架构变的比较重,有一些轻量的执行没有必要通过 Entrance-EngineManger-Engine 模式进行实现。例如 Linkis JDBC 引擎的实现就是通过 Entrance 的模式。JDBC 引擎的职责就是作为 JDBC 连接的客户端向服务端发送请求,并进行连接的维护。JDBC 连接的维护是比较轻量级的,而且 JDBC 连接的复用也不是根据平台用户进行区分的,所以单独为每个用户启动一个引擎是没有必要的。

ElasticSearch 和 Presto 的客户端实际上就是 Http Client,所以 ElasticSearch 和 Presto 引擎的实现也应该是比较轻量的,最终我们实现的 ElasticSearch 和 Presto 引擎也是通过 Entrance 的模式实现的。

引擎资源控制

Linkis 的资源管理服务,用来管理用户、系统的资源和并发的控制,实现新的引擎需要考虑到引擎资源相关接口的实现。具体架构可参考:Linkis RM设计文档

Entrance-EngineManger-Engine 模式资源控制

Entrance-EngineManger-Engine 的模式,资源相关主要需要下面两个实现:

  1. EngineManger 注册资源
    Linkis RM设计文档中可以看到,EngineManger 作为 Engine 资源的管理者,需要先向 ResourceManger 进行管理资源的注册。
    Linkis 已经将 EngineManger 注册资源的逻辑进行了抽象,实现的时候只需要在 SpringConfiguration 中进行配置创建 resources 的 spring bean 对象,可以参考 SparkEngineManagerSpringConfiguration 的实现。
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineManagerSpringConfiguration

@Configuration
class SparkEngineManagerSpringConfiguration {

  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    val totalResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_MAX_MEMORY_AVAILABLE.getValue.toLong,
        ENGINE_MANAGER_MAX_CORES_AVAILABLE.getValue, ENGINE_MANAGER_MAX_CREATE_INSTANCES.getValue),
      null
    )

    val protectedResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_PROTECTED_MEMORY.getValue.toLong, ENGINE_MANAGER_PROTECTED_CORES.getValue,
        ENGINE_MANAGER_PROTECTED_INSTANCES.getValue),
      null
    )

    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectedResource, ResourceRequestPolicy.DriverAndYarn)
  }
  // ...
}
  1. EngineResourceFactory 实现
    EngineManager 创建 Engine 的时候需要先向 ResourceManger 去请求资源,所以新引擎需要提供 EngineResourceFactory 的实现,用来初始化创新 Engine 所需要的资源,再向 ResourceManger 进行请求。
    Linkis 中提供了 AbstractEngineResourceFactory 的抽象,实现的时候只需要从 AbstractEngineResourceFactory 继承。具体可参考 SparkEngineResourceFactory 的实现:
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineResourceFactory

@Component("engineResourceFactory")
class SparkEngineResourceFactory extends AbstractEngineResourceFactory {

  override protected def getRequestResource(properties: java.util.Map[String, String]): DriverAndYarnResource = {
    val executorNum = DWC_SPARK_EXECUTOR_INSTANCES.getValue(properties)
    new DriverAndYarnResource(
      new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_DRIVER_MEMORY.getValue(properties) + "G"),
        DWC_SPARK_DRIVER_CORES,
        1),
      new YarnResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_EXECUTOR_MEMORY.getValue(properties) * executorNum + "G"),
        DWC_SPARK_EXECUTOR_CORES.getValue(properties) * executorNum,
        0,
        DWC_QUEUE_NAME.getValue(properties))
    )
  }
}

Entrance 模式并发控制

Lnkis 中将 Engine 的实例数作为资源的一种,目前用户请求的并发是通过 Engine 的实例数进行控制的,那么在 Entrance 的模式下,就没有很好的对用户的并发进行控制。

在 ElasticSearch 和 Presto 的实现中,我们参考了 EngineManager 的资源控制,将并发数作为资源的一种,在 Entrance 启动时进行模块资源注册。将每个执行作为一个实例,执行发生时先进行资源的请求和锁定,执行完成后进行资源的释放,从而达到用户并发的控制。

主要包括了以下步骤:

  1. Entrance 注册并发资源
    Entrance 注册并发资源,需要创建资源实例,将并发作为资源的一部分,然后配合 @EnableResourceManager 和 @RegisterResource 注解进行资源注册。
  // 定义资源
  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    // 创建并发资源实例,分为总资源和受保护的资源
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }
  
  // 注册资源
  @RegisterResource
  def registerResources(): ModuleInfo = resources
  1. 执行前请求锁定资源
    执行实例初始化前,先通过 ResourceManagerClient#requestResource 方法请求锁定并发实例资源。
rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
  case NotEnoughResource(reason) =>
    // 没有请求到资源,抛出异常
    throw EsEngineException(LogUtils.generateWarn(reason))
  case AvailableResource(ticketId) => {
    // 请求到资源,创建执行实例,并保存 ticketId 用于释放资源
    // ...
    // 当资源被实例化后,返回实际占用的资源总量
    rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
  }
}
  1. 执行完成释放资源
    执行完成后销毁执行实例,并通过 ResourceManagerClient#resourceReleased 方法释放锁定的资源。
// 使用 ticketId 释放对应的资源
rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))

ElasticSearch 引擎的实现

下面是微众王和平大佬帮忙画的 ElasticSearch 引擎整体的架构图:

ElasticSearch引擎架构图

Linkis 新引擎的实现还是比较容易的,ElasticSearch 引擎的代码结构如下,整体的代码量也是比较少。主要包括了资源的配置、执行器的实例化和ElasticSearch请求与结果解析的相关代码。

Es引擎代码结构

  1. 资源注册
    ElasticSearch 引擎需要考虑到用户请求的并发和 Entrance 整体并发的控制。
    Entrance 启动时,需要对 Entrance 可用资源进行注册,主要包括了最大实例数和保护的阈值。在 EsSpringConfiguration 中生成资源的 bean 对象,并传入 EsEngineManager 进行注册,配置 @EnableResourceManager 和 @RegisterResource 就会自动进行注册。
// com.webank.wedatasphere.linkis.entrance.conf.EsSpringConfiguration
class EsSpringConfiguration extends Logging{

  @Bean(Array("resources"))
  def createResource(@Autowired rmClient: ResourceManagerClient): ModuleInfo = {
    // Clean up resources before creating resources to prevent dirty data when exiting abnormally (创造资源之前进行资源清理,防止异常退出时产生了脏数据)
    Utils.tryQuietly(rmClient.unregister())
    Utils.addShutdownHook({
      info("rmClient shutdown, unregister resource...")
      rmClient.unregister
    })
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }

}

// com.webank.wedatasphere.linkis.entrance.execute.EsEngineManager
@EnableResourceManager
class EsEngineManager(resources: ModuleInfo) extends EngineManager with Logging {

  @RegisterResource
  def registerResources(): ModuleInfo = resources

}
  1. 请求执行器
    EsEngineRequester 启动一个执行器,用于任务的执行,通过 request 方法对传入的 job 生成一个执行的 EsEntranceEngine,请求时先向 ResourceManager 请求并锁定一个实例的资源,在 EsEntranceEngine 执行结束后会进行释放。
// com.webank.wedatasphere.linkis.entrance.execute.EsEngineRequester
class EsEngineRequester(groupFactory: GroupFactory, rmClient: ResourceManagerClient) extends EngineRequester {
  override def request(job: Job): Option[EntranceEngine] = job match {
    case entranceJob: EntranceJob => {
      val requestEngine = createRequestEngine(job);
      // request resource manager
      rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
        case NotEnoughResource(reason) =>
          throw EsEngineException(LogUtils.generateWarn(reason))
        case AvailableResource(ticketId) => {
          val engine = new EsEntranceEngine(idGenerator.incrementAndGet(), new util.HashMap[String, String](requestEngine.properties)
            , () => {rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))})
          engine.setGroup(groupFactory.getOrCreateGroup(getGroupName(requestEngine.creator, requestEngine.user)))
          engine.setUser(requestEngine.user)
          engine.setCreator(requestEngine.creator)
//          engine.updateState(ExecutorState.Starting, ExecutorState.Idle, null, null)
          engine.setJob(entranceJob)
          engine.init()
          executorListener.foreach(_.onExecutorCreated(engine))
          rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
          Option(engine)
        }
      }
    }
    case _ => None
  }
}
// com.webank.wedatasphere.linkis.entrance.execute.EsEntranceEngine
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  override def close(): Unit = {
    try {
      this.job.setResultSize(0)
      this.engineExecutor.close
      // 释放资源
      resourceRelease()
      // ......
}
  1. 任务执行
    EsEntranceEngine 是 com.webank.wedatasphere.linkis.entrance.execute.EntranceEngine 的实现,进行脚本的执行。在这里抽出一层 EsEngineExecutor 作为 Es 任务的具体执行。EsEntranceEngine 则负责 EsEngineExecutor 的初始化、脚本解析切分等实现。
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  private var engineExecutor: EsEngineExecutor = _
  // ...
  override def execute(executeRequest: ExecuteRequest): ExecuteResponse   // ...
  protected def executeLine(code: String): ExecuteResponse = this.engineExecutor.executeLine(code, storePath, s"_$codeLine")
  
}
  1. ElasticSearch 脚本执行
    entrance.executor 包中就是 ElasticSearch 客户端的封装、请求的封装和结果的解析等相关代码。
    ElasticSearch 客户端封装在 EsClient 中,通过 EsClientFactory 进行实例化,并将 datasourceName 作为唯一 Key 进行缓存。
    EsEngineExecutorImpl 是 EsEngineExecutor 的实现,用于任务的执行。
    ResponseHandlerImpl 用于结果的处理,会根据 ElasticSearch 的返回类型进行反序列化,并保存为 Linkis 的 ResultSet。

DataSource 路由

在与微众大佬的讨论交流中得知后面 Linkis 的架构将会引入 DataSource 的概念,DataSource 模块维护引擎的连接信息和集群等信息,可以减少一些数据源运行配置,方便数据源配置和权限管理,为数据平台提供元数据信息,并可根据 DataSource 进行路由实现多集群的路由。

在 Linkis-0.11.0版本中添加了 linkis-gateway-ujes-datasource-ruler 模块,作为一个 Gateway 插件的形式简单实现了,请求和 Entrance 的路由。

linkis-gateway-ujes-datasource-ruler 模块的实现

抽象出 EntranceGatewayRouterRuler 接口用于执行路由规则,在 Gateway 模块的 EntranceGatewayRouter 中注入 EntranceGatewayRouterRuler 实例。

@Component
class EntranceGatewayRouter extends AbstractGatewayRouter {

  @Autowired(required = false)
  private var rules: Array[EntranceGatewayRouterRuler] = _
  
  override def route(gatewayContext: GatewayContext): ServiceInstance = {
    gatewayContext.getGatewayRoute.getRequestURI match {
      case EntranceGatewayRouter.ENTRANCE_REGEX(_) =>
        // ...
        serviceId.map(applicationName => {
          rules match {
            case array: Array[EntranceGatewayRouterRuler] => array.foreach(_.rule(applicationName, gatewayContext))
            case _ =>
          }
          ServiceInstance(applicationName, gatewayContext.getGatewayRoute.getServiceInstance.getInstance)
        }).orNull
      case _ => null
    }
  }
  
}

linkis-gateway-ujes-datasource-ruler 模块,主要是做了一个 DataSource 和 Entrance Instance 的映射,并保存在 Mysql 中。DatasourceGatewayRouterRuler 实现了具体的路由策略,DatasourceMapService 接口维护 DataSource 映射。

// 维护 DataSource 映射的接口
public interface DatasourceMapService {

    String getInstanceByDatasource(String datasourceName);

    long countByInstance(String instance);

    String insertDatasourceMap(String datasourceName, String instance, String serviceId);

}

// EntranceGatewayRouterRuler 的实现类,执行具体的路由逻辑
class DatasourceGatewayRouterRuler extends EntranceGatewayRouterRuler with Logging {

  // 路由的方法
  override def rule(serviceId: String, gatewayContext: GatewayContext): Unit = if(StringUtils.isNotBlank(gatewayContext.getRequest.getRequestBody)) {
    // 从请求中获取 datasourceName
    val datasourceName = getDatasourceName(gatewayContext.getRequest.getRequestBody)
    if (StringUtils.isBlank(datasourceName)) return
    debug(s"datasourceName: $datasourceName")
    // 通过 datasourceName 获取映射
    datasourceMapService.getInstanceByDatasource(datasourceName) match {
      case i: String if StringUtils.isNotBlank(i) => 
        // 存在映射直接返回 Instance
        gatewayContext.getGatewayRoute.getServiceInstance.setInstance(i)
      case _ => {
        // 不存在映射时,先获取 Instance 列表,并根据已经存在映射的数据按照从小到大排序,获取最少映射的 Instance,插入 DataSource 映射并返回
        val newInstance = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId)
          .map(item => (item, datasourceMapService.countByInstance(item.getInstance)))
          .sortBy(_._2).map(_._1.getInstance).headOption match {
            case Some(item) => datasourceMapService.insertDatasourceMap(datasourceName, item, serviceId)
            case None => null
          }
        debug(s"newInstance: $newInstance")
        if (StringUtils.isNotBlank(newInstance)) {
          gatewayContext.getGatewayRoute.getServiceInstance.setInstance(newInstance)
        }
      }
    }
  }
  
}

【有奖征文】DSS+Linkis在知因智慧使用情况

一.应用场景

首先感谢社区各位大佬的指点,学习到很多。

知因智慧是一家toB金融公司,里面需要大量的ETL过程,原先用Shell脚本连接各种Hql,Spark等等,XXL- Job调度,可能一个模块就被一个大的脚本包含住了,耦合性特别强,调度这块也有问题,无法监控中间的报错,2019下半年时看到社区开源组件,一直研究怎么跟公司整合。

希望借助社区的力量,结合公司实际情况,打通公司级数据中台的流程,目前数据建设主要集中在元数据管理,数据仓库ETL流程,数据质量,任务调度这几个方面。

二. 解决的问题

基于LDAP服务

基于LDAP管理用户,代理服务模块修改,以组为单位共用账户,公司的整个数据开发人员不多,基于这种方式可以支撑下去。

object LDAPUtils extends Logging {
  val url =  CommonVars("wds.linkis.ldap.proxy.url", "").getValue
  val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue
  def login(userID: String, password: String): Unit = {
    if(userID.isEmpty) throw new NamingException("userID is null")
    val env = new Hashtable[String, String]()
    val bindDN = "uid="+userID+","
    val bindPassword = password
    env.put(Context.SECURITY_AUTHENTICATION, "simple")
    env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory")
    env.put(Context.PROVIDER_URL, url)
    env.put(Context.SECURITY_PRINCIPAL, bindDN+baseDN)
    env.put(Context.SECURITY_CREDENTIALS, bindPassword)
    new InitialLdapContext(env, null)
    info(s"user $userID login success.")
  }
}

放开权限

因为资源有限,有些权限管理很繁琐,放开一些权限:

filesystem.path 存储日志,脚本的权限,统一根据nfs挂载,把目录权限统一根据用户随意修改。

linkis-metadata元数据管理,把Hive相关权限放开(HiveMetaDao.xml):

<select id="getDbsByUser" resultType="java.lang.String" parameterType="java.lang.String" databaseId="mysql">
   select NAME from DBS GROUP BY NAME order by NAME
</select>
<select id="getTablesByDbNameAndUser" resultType="map"  parameterType="map" databaseId="mysql">
    select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
    from TBLS t2, DBS t3 where 1=1 and t2.DB_ID=t3.DB_ID and t3.NAME = #{dbName,jdbcType=VARCHAR}
    order by NAME;
</select>

依赖兼容问题

因为环境是CDH-5.16.2 编译部署DSS和Linkis是根据原生的版本号,大部分服务都没有问题,但是有的服务有些问题,因为CDH会把组件重新编译,有的指令会改变。

原先发生过Hive 和 Spark应该支持的函数,到Scriptis上运行脚本,不支持,这是因为得把两个服务相关的Hive 和 Spark jar 都变成后缀带有CDH的。

image

tez的支持

Linkis Hive引擎对tez的支持:

  • tez相关jar放到linkis-ujes-hive-enginemanager/lib
  • linkis.properties配置Hive配置文件目录,hive-site.xml配置文件中
<property>
    <name>tez.lib.uris</name>
    <value>hdfs:///apps/tez/tez-0.8.5.tar.gz</value>
  </property>
   <property>
    <name>hive.tez.container.size</name>
    <value>10240</value>
  </property>

Shell定义变量

image

自定义变量的支持:

CustomVariableUtils 工具类中,Shell关枚举都要添加上。

/**
 * @Classname ShellScriptCompaction
 * @Description TODO
 * @Date 2020/8/19 18:22
 * @Created by limeng
 */
class ShellScriptCompaction private extends CommonScriptCompaction{
  override def prefixConf: String = "#conf@set"
  override def prefix: String = "#@set"
  override def belongTo(suffix: String): Boolean ={
    suffix match {
      case "sh"=>true
      case _=>false
    }
  }
}
object ShellScriptCompaction{
  val shellScriptCompaction:ShellScriptCompaction=new ShellScriptCompaction
  def apply(): CommonScriptCompaction = shellScriptCompaction
}

ScriptFsWriter Shell相关 def listCompactions(): Array[Compaction] = Array(PYScriptCompaction(),QLScriptCompaction(),ScalaScriptCompaction(),ShellScriptCompaction())
WorkspaceUtil 工具类正则有问题,无法修改名称,中间有.符号去除

public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException {
    String name = new File(path).getName();
    LOGGER.info(path);
    String specialRegEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\]<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]|\n|\r|\t";
    Pattern specialPattern = Pattern.compile(specialRegEx);
    if(specialPattern.matcher(name).find()){
        throw new WorkSpaceException("the path exist special char");
    }
}

使用 eventReceiver节点异常(eventchecker组件)#247

EventCheckerNodeExecution.scala
      Utils.tryFinally {
		
              resultSetWriter.addMetaData(null)
		
              resultSetWriter.addRecord(new LineRecord(action.saveKeyAndValue))
		
           	
            }(Utils.tryQuietly(resultSetWriter.close()))
		
          }
          response.setIsSucceed(true)
		
        }else{
        
.............................................
AppJointEntranceJob.scala
override def run(): Unit = {
  if(!isScheduled) return
  info(s"$getId starts to run")
  getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(s"$getId starts to execute.")))
  startTime = System.currentTimeMillis
  getExecutor match {
    case appjointEntranceEngine:AppJointEntranceEngine => appjointEntranceEngine.setJob(this)
      appjointEntranceEngine.setInstance(Sender.getThisInstance)
  }
  Utils.tryAndErrorMsg(transition(Running))(s"transition $getId from Scheduler to Running failed.")

加入spark streaming

  • linkis-ujes-spark-enginemanager引入依赖,spark-streaming相关。
  • SparkScalaExecutor.scala bindSparkSession方法引入相关依赖

测试结果
企业微信截图_16062055657760

调试服务

把有问题的服务,bin目录下启动脚本,远程debug打开

image

因为平台Cookie的原因,直接用接口发送请求,有的无法调试:

import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname HttpUtil
 * @Description TODO
 * @Date 2020/10/30 14:49
 * @Created by limeng
 */
public class HttpUtil {
    public static RestTemplate getRestClient(){
        CloseableHttpClient build =  HttpClientBuilder.create().useSystemProperties().build();
        return new RestTemplate(new HttpComponentsClientHttpRequestFactory(build));
    }
}
import com.linkis.web.utils.HttpUtil;
import net.sf.json.JSONObject;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname LinkisMain
 * @Description TODO
 * @Date 2020/10/30 14:54
 * @Created by limeng
 * 测试类 EntranceRestfulTest
 */
public class LinkisMain {
    public static void main(String[] args) {
        RestTemplate restClient = HttpUtil.getRestClient();
        JSONObject postData = new JSONObject();
        postData.put("password","hdfs");
        postData.put("userName","hdfs");
        String loginUrl = "http://192.168.200.116:8088/api/rest_j/v1/user/login";
        ResponseEntity<JSONObject> jsonResponseEntity = restClient.postForEntity(loginUrl, postData, JSONObject.class);
        System.out.println("状态码:"+jsonResponseEntity.getStatusCodeValue());
        JSONObject body = jsonResponseEntity.getBody();
        System.out.println("body :" + body.toString());
    }
}

三.最佳实践

我以公司标签库为例,讲述下操作流程。

对企业数据进行挖掘和分析,建立标签特征体系,创建个性化的多层级标签,并在此基础上进行细分和精准营销场景应用,有利于对企业/集团进行深入经营,充分挖掘企业/集团潜力,提升企业/集团价值。以此为目标的标签库的构建,将有利于了解和深耕企业/集团,可更好地助力于企业金融服务。

image

image

这是执行流程图,中间调度过程,Hql,Spark目前运行在DSS平台上。

image

其中一个企业经营特别的跑批流程。

软件版本

CDH-5.16.2

Hadoop-2.6

Hive-1.1

Spark-2.2

kafka_2.11-1.0.1

【有奖征文】天翼云对DSS的公有云改造和后续思路

天翼云对DSS的公有云改造和后续思路

背景

在此之前,**电信对内的大数据都没有一个统一的入口,我们大数据团队的负责人刚神很早就关注了wedata社区。在开源方面,DataSphere Sdutio(以下简称DSS)作为大数据一站式开发平台是非常优秀的,同类竞品也比较少或者不够完善,所以我们团队选择了DSS。在我们内部使用一段时间后大家的反馈都还不错,同时领导也提出了把dss+linkis改造成公有云版本的建议,我们团队经过讨论达成一致意见并迅速的开展了相关工作,历时2个多月的时间改造完成并且上线。在这期间得到社区的大力支持,特别是强哥、平哥、黄哥等微众大佬的鼎力支持和问题解答,在此深表感谢。以下就改造工作简单做一下分享:

对接天翼云的单点登录

linkis-gateway本身是支持单点登录的,实际接入天翼云cas单点登录也遇到了一些问题,比如:cas回调接口会被网关拦截;2.dss默认设置登陆状态的方法需要移到cas模块,getbaseInfo的设置登陆需要去掉。其他的接入步骤按官方步骤基本上就能正常运行。这块我们也是做成了配置(包括回调接口,重定向地址,ctyun自己的还有appid等)
第一步新建了一个单点登录的模块(我们是ctyunsso)实现SSOInterceptor

		trait SSOInterceptor {

			  /**
			    * 如果打开SSO单点登录功能,当前端跳转SSO登录页面登录成功后,会重新跳回到DSS首页,这时DSS前端再次请求gateway,
			    *     gateway会通过调用该方法获取已SSO登录的用户,然后将用户写入cookie,保证后续请求可直接放行。
			    * 您需实现该方法,通过Request返回用户名。
			    * @param gatewayContext
			    * @return
			    */
			  def getUser(gatewayContext: GatewayContext): String
			
			  /**
			    * 通过DSS首页Url,用户生成一个可重定向的SSO登录页面URL。
			    * 要求:需带上requestUrl,以便SSO登录成功后能跳转回来
			    * @param requestUrl DSS首页URL
			    * @return 例如: https://${sso_host}:${sso_port}/cas/login?redirectUrl=${requestUrl}
			    */
			  def redirectTo(requestUrl: URI): String
			
			  /**
			    * 用户退出登录时,gateway会调用此接口,以保证gateway清除cookie后,SSO单点登录也会把登录信息清除掉
			    * @param gatewayContext
			    */
			  def logout(gatewayContext: GatewayContext): Unit
			
	}

第二步 配置和jar包

linkis-gateway增加下面配置
wds.linkis.gateway.conf.enable.sso=true  //开启单点登录模式
wds.linkis.gateway.conf.sso.interceptor=cn.ctyun.SSOXX  自己实现的单点登录类
将单点登录模块的jar包放到linkis-gataway/lib 下面
模块可以建在linkis下面,方便打包。

安全性全面增强

对天翼云来说安全有一票否决权,我们的服务都要符合各种安全策略(如https,nginx配置,验证码,代码漏洞扫描都做了改造,弱密码等),以https为例,咨询社区暂时没考虑https,我们自己做了升级https,发现几乎所有模块都有问题,这个url同时也是后端各个模块之前通信的baseurl配置,比如qualitis,visaulis与dss之前的交互,创建删除更新项目等;因为DSS前端模块跳转的链接也要用到这个配置,所以只能配置外网的https baseurl;基本上把所有模块的httpclient,DSS下所有的appjoint,linkis的httpclient都做了改动。兼容了http和https两种调用。(这块应该把前后端url分开,因为时间原因,我们没有改动,实际证明还是应该改的)。
image

升级改造过程中也修复问题和增加的一些特性 大概20个左右
image

用户工作空间HA

社区版本用户工作空间目录默认是使用linux目录,需要在linux下建N个用户和目录,还有备份等原因我们选择了hdfs目录,社区也是支持这个功能的,但不太完善,比如前端不支持保存等,后端也存在很多问题需要适配,具体这块的坑,我的同事涛哥会有专门的文章来介绍

用户资源自动开通

社区版本因为没有太多的涉及到用户管理以及用户开通功能的版块,因此我们对这个版块进行了改造:采用代理用户的方式来实现多租户。在早期内部私有化试用阶段,流程上我们通过系统工单形式进行账户申请,技术上采用业务人员手动执行脚本的方式来进行用户开通,涉及到LDAP账户、keytab授权、hdfs目录、hive库表等多个模块的开通与授权工作,采用脚本开通需要付出一定人力成本,出现问题时排查需要付出较多精力。在进行公有云改造时,依托我们近期开发的运维平台优势,将底层资源开通的功能交给了运维平台实现,开发了一个单独的用户授权服务,将整个用户开通流程从脚本调用方式升级为运维平台API调用。当有新用户需求时,天翼云用户中心向用户授权模块下发工单,由用户授权模块调用运维平台,实现对底层资源的开通,实现了用户资源的自动开通;使得各模块职责单一,出现问题后也便于排查,提高了效率。如果这一版块其他小伙伴也存在相同的需求,我们也会将授权模块的实现框架进行开源并与社区探讨整合。
image

未来展望

现在产品还有很多需要完善的地方,比如分布式(规划如下)。
image

还有多用户管理,多集群等,新手引导,案例模板等,还有社区1.0发布之后合并到我们的公有云版本

【有奖征文】Linkis使用分享

应用场景:
公司的数据分析团队目前使用的是sas软件,界面较落后,需要在每台机器上都安装客户端,单机环境,依赖于机器配置,无法与集群速度相比,缺少高可用高并发的特性。目前公司的数据处理方式,需要先将数据下载到本地,再通过sas编写脚本处理,原始数据、脚本、结果数据会极大占用机器空间。并且sas是国外的成熟商业软件,每年的采购价格不菲。为了节省开支,以及支持办公软件国产化,所以寻找国内优秀的开源软件用以逐步替代sas。

解决问题:
使用了dss与linkis组件,以页面化的形式开发脚本,方便快捷,能够使用多种类型脚本直接操作hive数据,不需要数据导出到本地后再处理,对sas节省资源开销,相比较于sas方便的管理HDFS中的文件。目前阶段处于分析人员使用dss开发数据处理脚本,以及使用过程中的问题修复。

使用情况:
目前阶段处于linkis适配公司环境,以及修复使用问题,还未新增功能或者新引擎支持,后续如有会分享出来。
公司的大数据相关环境有专门的团队负责,我们在安装使用的过程中进行了一系列的适配:

1.大数据环境使用的是CDH 5.16.1,而源码是社区版本,所以根据具体的版本我们进行了重新编译。

2.HDFS权限被ACL接管,不与Linux系统权限同步,所以直接用hql脚本查询数据时,遇到了HDFS目录无权访问的情况。经过沟通了解到数据团队规定必须通过jdbc,经过域账号验证后,才可访问hive数据。所以在使用中暂时将hql和sql的脚本隐藏,主要使用jdbc和python脚本来处理数据。

3.公司spark版本是2.4.0.cloudera2,修改了后台识别版本的逻辑。

4.CDH对版本校验比较严格,所以修改了pyspark.zip包中的content.py文件,将社区版中的分支判断补充进来:

if allow_insecure_env == "1" or allow_insecure_env.lower() == "true":
warnings.warn(
"You are passing in an insecure Py4j gateway.  This "
"presents a security risk, and will be completely forbidden in Spark 3.0")

5.因为数据分析人员较多,所以需要创建多个账号,linkis的使用条件需要在linux机器中有对应的用户,以及linux、hdfs上的工作空间目录,权限设置等,所以我们开发了一个user-init脚本,专门针对新用户使用linkis的初始化操作。并且公司AD域账号全部为小写字母,修改登陆逻辑不强制将用户名转小写。

6.Linkis数据库列表的数据来源通过读取hive元数据库方式,为配合权限管控,修改为hiveserver2校验账号密码的方式获取有权限的数据库列表,相应的点击数据库后的查询表操作,以及查询表内容等操作,全部修改为jdbc方式。

7.增加方法函数支持python。

问题修复:
1.scriptis中左侧数据库操作,从hql转为jdbc方式,并且修改了dbs接口:

private List<String> getDbsByUser(String userName) throws ErrorException, SQLException {
    JdbcSettingResponse jdbcSetting = queryJdbcSetting(userName);

    if(StringUtils.isEmpty(jdbcSetting.url()) || StringUtils.isEmpty(jdbcSetting.userName()) || StringUtils.isEmpty(jdbcSetting.passwd()))
    {
        throw new ErrorException(9999, "JDBC配置为空,请在控制台中进行JDBC连接设置。");
    }

    HiveDatabaseAndTableRetriever retriever = new HiveDatabaseAndTableRetriever(jdbcSetting.url(), jdbcSetting.userName(), jdbcSetting.passwd());
    return retriever.getDbs();
}

private JdbcSettingResponse queryJdbcSetting(String userName) {
    JdbcSettingRequest request = new JdbcSettingRequest(userName);
    Sender sender = Sender.getSender("cloud-publicservice");
    return (JdbcSettingResponse)sender.ask(request);
}

2.scriptis中打开多个tab页时,最右边标签无法关闭:

this.tabMove.maxTabLen = Math.floor(this.width / 100) - 1;

3.脚本自定义参数保存失败:

const params = ismodifyByOldTab ? option.params : this.convertSettingParams(rst.metadata);

4.工作流中的节点脚本执行完成没有耗时显示:

<we-progress> 中增加:cost-time="script.progress.costTime"

5.工作流中的节点点击历史后,无法返回编辑页。

6.脚本开发运行结果全屏后无法退出。

7.控制台中jdbc连接设置修改后,立即生效:
原有的缓存时间是120秒,为了避免修改CacheableRPCInterceptor影响其他,所以新建了一个ShortCacheableRPCInterceptor类,缓存时间设置为3秒:

@Component
class ShortCacheableRPCInterceptor extends RPCInterceptor with Logging{

  private val guavaCache: Cache[Any, Any] = CacheBuilder.newBuilder().concurrencyLevel(5)
    .expireAfterWrite(3000, TimeUnit.MILLISECONDS).initialCapacity(20)  //TODO Make parameters(做成参数)
    .maximumSize(1000).recordStats().removalListener(new RemovalListener[Any, Any] {
    override def onRemoval(removalNotification: RemovalNotification[Any, Any]): Unit = {
      debug(s"CacheSender removed key => ${removalNotification.getKey}, value => ${removalNotification.getValue}.")
    }
  }).asInstanceOf[CacheBuilder[Any, Any]].build()

  override val order: Int = 11

  override def intercept(interceptorExchange: RPCInterceptorExchange, chain: RPCInterceptorChain): Any = interceptorExchange.getProtocol match {
    case cacheable: ShortCacheableProtocol =>
      guavaCache.get(cacheable.toString, new Callable[Any] {
        override def call(): Any = {
          val returnMsg = chain.handle(interceptorExchange)
          returnMsg match {
            case warn: WarnException =>
              throw warn
            case _ =>
              returnMsg
          }
        }
      })
    case _ => chain.handle(interceptorExchange)
  }
}

8.scriptis中左侧数据库进行删表操作后,刷新库时的定位错误问题:

this.currentAcitved = find(this.tableList, (db) => db.name === this.currentAcitved.dbName || this.currentAcitved.name);

9.用户提出希望有组概念,同组中的用户可以看到各自的脚本文件。现有的scriptis设计是租户隔离,所以我们新增了user_group表,保存用户和组关系,在user-init脚本中新增了添加用户组逻辑,修改了getUserRootPath的接口,增加了一层组目录判断(没有添加组的用户,逻辑与原本保持不变),并挂载到nas盘上,实现了同组用户在windows中可以互相查看脚本的需求。

10.scriptis中的脚本名称包含特定字符时,出现没有小图标以及丢失脚本执行按钮的问题,正则表达式判断有误:

{ rule: /(表详情)|(Table\sdetails)/, executable: false, isCanBeOpen: true },
{ rule: /(建表向导)|(Table\screation\sguide)/, executable: false, isCanBeOpen: true }

11.python支持方法函数:

//ConstantVar 新增
public final static int FUNCTION_PYTHON = 11;

12.python脚本在执行过程中取消后,再次执行时会延续上一次取消前的状态,且找不到系统方法定义 ,修改了pythonEngineExecutor的逻辑:

//executeLine中增加
if(!this.isEngineInitialized) {
  savedHookCodes.add(code)
}

if(this.pySession == null) this synchronized {
  if(this.pySession == null) {
    this.pySession = new PythonSession
    for(i <- 0 to savedHookCodes.size() -1){
      info("executeLine code when pySession is null:" + savedHookCodes.get(i))
      executeLine(engineExecutorContext, savedHookCodes.get(i))
    }
  }
}

13.jdbc脚本并发操作时会出现持续处于运行中状态的问题:
将全局变量中的connection statement 修改为executeLine方法中的局部变量来解决多线程并发问题,同时将所有的close方法修改到finally中,改动后还未复现。

14.jdbc脚本查询结果超过entrance中规定的最大缓存时(一般数据量为5000左右就会超过),查询结果不完整的问题:
executeLine中resultSetWriter没有正常关闭导致生成的dolphin文件不全。

期望功能:

  1. 脚本信息打印:当一个脚本中有多段数据处理过程时,只能看到最终执行结果,无法查询每一段的执行情况:耗时,执行生效条数等信息,使用人员分析时较为困难。
  2. 用户管理,要使用dss需要在服务器上创建系统用户,目前是通过我们自己编写的用户初始化bash脚本,创建用户、相关工作目录以及权限设置,希望后续dss版本中增加用户管理功能。
  3. 多数据源管理:可以处理hive以外的其他数据源数据。
  4. 目前版本的linkis脚本引擎启动时间过长,希望可以改善。
  5. 目前遇到了一些脚本持续停留在排队中的情况,重启引擎后才可以正常执行。

【有奖征文】linkis与SQL中间件(跨数据源混查)结合实践分享

标题:linkis与SQL中间件(跨数据源混查)结合实践分享

背景需求

-业务需求:

在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。
但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。
比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况,
平时这些需求都需要程序猿大神们写程序实现。

现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗?

-技术迭代:

之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。
一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。

SQL中间件介绍:

SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀,
至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。

以下分别简单介绍下两款开源组件:

XSQL:

XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。

功能特性:

  • 内置8种数据源,包括:Hive、Mysql、EleasticSearch、Mongo、Kafka、Hbase、Redis、Druid等。
  • XSQL采用数据源(DataSource)、数据库(Database)、数据表(Table)的三层元信息,为异构数据源提供了统一视图,进而实现了跨数据源的数据关联
  • SQL Everything,将程序与数据源具体版本解耦,程序迁移能力得到加强
  • 对DDL、DML、可下推查询,延迟与Yarn的交互及资源申请,进而提升效率并节省资源。
  • 相比很多开源分布式查询引擎,XSQL替换了Spark SQL,因而只需要一次SQL解析,避免多次解析带来的时延。
  • XSQL允许用户将聚合、过滤、投影等操作下推至数据源计算引擎,相比DataSet API更容易实现毫秒级响应。
  • XSQL借鉴了业内优秀的开源项目,放弃元数据的中心化,因此避免了数据同步、数据不一致,数据延迟等不利因素。XSQL也因此在部署上更加轻量、简便。
  • XSQL对元数据的缓存有两种级别,既能减少对底层数据源的压力,也提升了XSQL的执行效率。
  • XSQL可以按照用户需要,设置元数据白名单来避免缓存多余的元信息,进一步提升执行效率。
  • 可适配到Spark 2.x任意版本,解压即可运行,不需要引入额外依赖。且与原生SparkSQL隔离运行,不影响现有程序运行

111

Quicksql:

Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。

功能特性:

  • 支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s;
  • 支持Spark、Flink双计算引擎;
  • 支持基础CLI命令行查询和JDBC远程连接查询;
  • JDBC类型数据源可通过YAML配置快速接入,无需修改代码;
  • 提供方言/语法对接模板,支持用户对新数据源的语法自定义;
  • 提供元数据采集功能,批量拉取预置元数据;
  • 支持落地HDFS,支持可配置的异步响应机制

p1

执行流程图

333

实践过程

参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。

功能点:

  • 1、支持按照不同集群加载相关配置
  • 2、支持自定义结果存储路径
  • 3、支持是否开启默认limit 5000限制保护
  • 4、linkis网关上socket支持token user认证。
  • 5、适配公司内部hadoop版本
  • 6、增加XSQL执行引擎

实现过程简述:

由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。
目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。


├── client-viewfs.xml
├── core-site-cluster1.xml
├── hbase-site-cluster1.xml
├── hdfs-site.xml
├── hive-default.xml
├── hive-exec-log4j.properties
├── hive-log4j.properties
├── hive-site-cluster1.xml
├── ivysettings.xml
├── mapred-site-cluster1.xml
├── spark-defaults-cluster1.conf
├── xsql-spark-defaults-cluster1.conf
└── yarn-site-cluster1.xml

在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。

ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH

{
	//这个地址也需要增加token参数
 	"method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH",
 	"data":{
		"params": {
			"variable":{
			},
			"configuration":{
				"special":{

				},
				"runtime":{

				},
				"startup":{
				}
			}
		},
		"executeApplicationName":"xsql",
		"executionCode":"SELECT * FROM abc limit 5;",
		"runType":"sql"
	}
}

由于业务实际查询时是需要全量数据,不需要进行limit限制。
而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。
业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。

以上是Linkis\ujes\entrance入口模块里进行参数接受处理。

XSQL执行引擎实现:

  • 目录结构

222

由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。

重点是修改如下:

主要涉及到linkis-ujes-xsql-engine 模块相关改动

  • pom.xml

<!--<spark.version>2.4.3</spark.version> -->
<!--把2.4.3修改为2.4.3.xsql-0.6.0 -->
<spark.version>2.4.3.xsql-0.6.0</spark.version>,

2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。

SparkEngineExecutorFactory 类

override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = {

    val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      conf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = {


	//val builder = SparkSession.builder.config(conf)
    //builder.enableHiveSupport().getOrCreate()

	//划重点:将enableHiveSupport改成enableXSQLSupport()
	val builder = SparkSession.builder.config(conf)
    builder.enableXSQLSupport().getOrCreate()
}

SparkEngineExecutor 类

override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {

	//同样要增加加载配置代码段
	val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      sc.getConf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

如何使用

提交参数如下:

{
    "params":{
        "variable":{
        },
        "configuration":{
            "special":{
            },
            "runtime":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            },
            "startup":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            }
        }
    },
    "executeApplicationName":"xsql",
    "executionCode":"
		REMOVE DATASOURCE IF EXISTS mysql_connect_name;
		ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28');
		REMOVE DATASOURCE IF EXISTS hive_cluster1;
		ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1');
		SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN 
		(SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2 
		ON t1.ip=t2.ip order by t2.time;",
    "runType":"sql"
}

XSQL语法说明:

删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。

查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name

第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...)

第二段数据库名称,这个需要是真实的数据库,比如database_name

第三段表名,表要是第二段数据库下真实的表名。

更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/

这样就可以实现mysql与hive数据进行关联查询了。

相关版本

hive 1.2.1

spark 2.4.3

linkis 0.9.3

xsql 0.6.0

java 1.8+

hadoop 2.7.2

相关资源

https://github.com/WeBankFinTech/Linkis

https://github.com/Qihoo360/XSQL

https://github.com/Qihoo360/Quicksql

【有奖征文】DSS在蔚来汽车的实践

1. 背景

蔚来汽车的数据平台Insight,经过三年多的发展,已经有了比较完善的各个组件和工具,可以完成数据采集、处理、分析、生成中间表,生成指标,形成报表,元信息管理、权限管理(Crystal)等等。很多人会疑问,为什么需要单独做一个一站式数据应用交互管理平台?

原因有以下几点:

  1. 能够提高用户的开发效率。目前数据平台的用户在进行业务开发、数据查询,要分别进入不同的组件进行操作,使用感觉上是比较割裂的,比如:查询元数据的时候,使用自研的Crystal工具;做数据分析时候,使用Zeppelin交互式查询平台;进行任务调度的时候,使用Airflow或者Oozie上去配置;下载HDFS上的数据的时候,需要使用hue去下载。因此我们认为,如果能有一个统一的一站式数据开发、分析、可视化的平台,可以降低用户使用Insight数据平台的成本,并起到一定的在内部推广Insight的作用。

  2. 能够将用户的所有脚本都管控起来,之后就可以通过代码扫描来监控代码质量,落实业务规范,以及合理化资源的使用情况等,甚至可以通过对脚本进行限制或者改写,来避免一些不合理的集群使用方式。

在今年下半年,经过一段时间的一站式数据应用交互管理平台的技术调研选型,我们最终确定以微众银行开源的DataSphere Studio作为Insight的一站式数据应用交互管理平台,并根据公司业务需要进行一些定制开发。

下图展示了目前Insight数据平台架构和DSS在其中的定位:

image2020-11-20_20-7-25

2. DSS与Airflow的结合

由于公司已经较大范围了使用了Airflow作为数据平台任务调度工具,但是DataSphere Studio目前只支持Azkaban的调度工具,尚未适配支持Airflow。

因次我们进行了一些二次开发,将DSS和Airflow初步结合了起来。修改的地方仅在DSS一个repo中,已经向社区提交了Pull request: https://github.com/WeBankFinTech/DataSphereStudio/pull/241/files

运行截图如下:

企业微信截图_3689aaf2-0672-41b1-ae7a-c5c80fe23613

企业微信截图_9e16da55-8759-4e05-85ad-64027edc3c6e

我们主要做了以下工作:

  1. 参照DSS已有调度模块dss-azkaban-scheduler-appjoint,开发了一个新的模块:dss-airflow-scheduler-appjoint。
  2. 参照plugins中azkaban的插件linkis-jobtype,开发了一个airflow worker上的运行client:linkis-airflow-client。

DSS和Airflow交互的总体架构图如图所示:

image2020-11-21_18-50-36

模块位置如下图所示:

image2020-11-20_16-44-50

2.1. dss-airflow-scheduler-appjoint

该模块主体也是一个SchedulerAppJoint类的实现,和dss-azkaban-scheduler-appjoint是替代关系。由于Airflow中没有project的概念,为了保留dss中project的概念,该模块实现方式如下:

  1. 发布一个project时候,将这个project中的每个flow都生成一个airflow的DAG的py文件,通过airflow的Restful API发布到Airflow中。这里做了一个优化,我们在提交前,会先从Airflow中获取Project下所有flow对应的DAG文件,筛选出变化的DAG文件进行发布。
  2. 删除一个project时候,将这个project中的每个flow对应的airflow的DAG的py,都通过Airflow的Restful API从Airflow中删除
  3. 删除单个flow的时候,需要通过Airflow的Restful API将flow对应的Airflow的dag删除
  4. 权限上,支持Airflow的LDAP和RBAC两种认证方式

2.2. linkis-airflow-client

  1. 对于Airflow一个Dag中的每个node,执行时候都调用linkis airflow client,它负责连接到linkis gateway中去执行具体任务(执行spark sql/datachecker等等),并获得gateway返回的任务日志进行输出。
  2. linkis airflow client是一个java进程,在Airflow的dag节点中,通过bash_operator进行调用。
  3. client的jar包需要提前部署在airflow的所有worker节点的相同路径中。

2.3. airflow-rest-api-plugin的扩展

通过API方式Airflow进行交互,有两种方式:(1)Github上的第三方开源Restful API实现,见 https://github.com/teamclairvoyant/airflow-rest-api-plugin (2)使用Airflow官方提供的实验性的Restful API: https://airflow.apache.org/docs/1.10.3/api.html 。经过调研,我们发现上述两种方式都缺少一些DSS和Airflow中所需要的Restful功能。考虑到第三方开源实现的代码。理解起来容易且便于扩展(比如一些Restful API本质上是去Airflow机器上执行文件系统操作,或者是调用Airflow的cli命令),因此在第三方开源实现上进行了一些扩展,加入一些新的功能,比如:

  1. 删除某个DAG文件
  2. 删除某个DAG在Airflow上的元信息
  3. 获得一个DAG文件的内容(base64编码)
  4. 获取某个DAG在某一次运行实例的日志

具体的扩展实现可以参考:https://github.com/sparklezzz/airflow-rest-api-plugin/blob/master/plugins/rest_api_plugin.py

2.4. 可以改进的地方

  1. schedulis是针对azkaban定制的,这部分没有做修改,所以只是直接指向airflow的web首页,当做一个内嵌的iframe使用,不带查看project的功能。如果需要,可以考虑使用Airflow的Restful API,开发一个简单的airflow日志查询和任务控制页面。
  2. linkis-airflow-client,目前配置的单个java进程实例内存占用限制在64MB,但多了的话还是会给airflow worker节点造成一定内存压力。可以考虑实现一个单进程的java服务,负责从各个linkis/dss服务获取任务运行日志,在airflow节点上常驻;一个DSS的Airflow任务开始调度时候,airflow woker和该java守护进程交互,从java守护进程中拉取linkis/dss的任务运行日志。
  3. dss-airflow-scheduler-appjoint和dss-azkaban-scheduler-appjoint是互斥的。

3. Linkis对Yarn Cluster,Kerboros的支持

3.1. 对Kerberos的支持

我们使用CDH 5.15版本, 其上的Hive、HDFS、Yarn都启用了Kerberos认证,并且CDH使用Sentry做Hive和HDFS的权限管理。

为了适配这种环境, 我们对Linkis相关模块进行修改,统一使用Hadoop的proxy user机制以DSS用户的身份来访问文件系统和提交Spark Job,这样不再需要给每个用户生成keytab。

由此,可以把用户由Linkis产生的私有数据都放在HDFS上,通过proxy user机制实现更方便的权限控制。

3.2. 对Yarn Cluster模式的支持

3.2.1. engine生命周期管理

在client模式下,engine本质上在使用端口号作为'id'。这一套机制的合理性建立在所有engine都和engine manager在同一台机器的前提下。

engine manager会使用当前机器的可用的端口作为engine的port,但是当engine位于另一台服务器上的时候,比如在spark的yarn cluster模式下,这个port不一定可用, 而且不同的engine实例的port可能是相同的。这就给engine的生命周期管理带来了一些问题。

我们对Linkis进行了改造:

  1. 使用一个单独的id来标识engine
  2. engine自己寻找本地可用端口
  3. engine在Eureka上注册元数据,以使得engine manager可以识别自己管理的engine
  4. 使用Yarn客户端来kill掉engine。

3.2.2. 部署

  1. 通过proxy user机制,以DSS用户的身份提交Spark程序和申请资源。
  2. 将Spark engine对Linkis的相关依赖打成一个jar包,并且shade掉与CDH自带版本相冲突的Guava库。

4. CICD落地实践

4.1. 问题

Linkis 和 DSS 这两个项目中包含大量的 maven module,我们内部用到了 11 个 Linkis module 和 8 个 DSS module。每个 module 对应一个 Spring 服务。虽然官方提供了安装和启动脚本,但是这两个脚本是面向整个项目的,不够灵活,也不够便捷。特别是我们对 Linkis 和 DSS 进行二次开发和测试的时候,如果只改了某个 module 的一部分代码,如何快速的部署并且测试改动的效果?如果采用官方的脚本,需要对整个项目的所有module编译并打包,然后部署,效率非常低。

4.2. 方案

为了解决上述问题,我们设计了一个 module 级别 CICD 的流程,方便开发高效测试和部署代码。具体流程是:开发本地修改代码 -> GitLab -> Jenkins -> 服务器。

4.3. 实现

我们梳理了 Linkis 和 DSS 两个项目的 module,参考了官方的部署和启动脚本,将每个 module 编译、打包、部署、根据环境下发配置参数和启动步骤进行拆分,通过脚本实现了 module 级别的自动化部署。

image2020-11-20_15-34-22

4.4. 效果


image2020-11-20_15-34-50

4.5. 待解决的问题

  1. module之间也有依赖关系,有时候修改一处代码,可能需要部署多个 module,还无法做到智能化。
  2. module级别k8s部署

5. 未来计划

后续我们对DSS有以下的二次开发计划:

  1. 前端页面的定制化改造,以适配公司业务分析人员的特定需要
  2. 更加便捷的script的版本管理功能
  3. 二进制数据结果下载功能

同时未来也会把一些较为通用的有价值的Feature贡献回社区,希望其他使用DSS的公司和个人能够从中受益。

【有奖征文】DSS+Linkis的使用情况

一、使用背景

  • dss+linkis是国内优秀的开源项目,感谢微众的小伙伴们的贡献和一直一来的热心指导。

  • 我司的大数据平台,包括数据集成、数仓、元数据、数据质量、统一调度、可视化、API开放等。但是比较大的缺憾是没有数据开发模块,在没有dss+linkis之前,我司都是使用hue开发脚本,没有统一的界面进行开发维护,也很难与现有的产品体系集成。

  • 自从接触到dss+linkis,和其他相关产品进行比较,感觉dss+linkis非常棒,非常适合我们,所以一直研究如何在我司现有产品体系中使用。

二、使用情况
目前阶段处于初步引入以及修复使用问题阶段。

  • 引入数据开发Scripts模块,与我司现有的统一认证进行集成,并把该模块整合到现的大数据平台体系中,作为单独的数据开发模块;

  • 数据开发Scripts管理的脚本与其他模块打通,如与现有调度中心模块、工作流开发模块集成;

  • 使用linkis作为我们数仓适配层,数仓上层所有的模块通过linkis与数仓交互,上层应用直接通过API接口与linkis交互,无需注意底层的技术细节;

  • 我司使用的环境是CDH6.0.1版本,根据使用重新编译后,有少量jar冲突需要手工处理;

  • 目前在初步接入阶段,在QC测试及使用的过程还是有不少问题需要去摸索、解决,希望在后续不断熟悉的基础上不断引入dss+linkis更多优秀的模块。

三、期待的功能与改进

  • 各个功能模块职责清晰,各模块之间独立性高一些,因为DSS中包括很多模块,但是在有些场景下只需要集成其中一个模块

  • linkis的各个引擎第一次执行时,启动时间较长,期望能有改进;

  • linkis对通用算法库的支持,如spark Mlib;

  • 可以有界面对用户和数据权限进行统一管理;

  • 目前配置和服务均较多,期望能简化方便运维管理

  • 增加加实时计算方面的支持;

  • 能增加元数据方面的支持;

【有奖征文】云原生会给linkis带来哪些技术红利

余额宝从0到亿级用户的发展历时半年,到2020年疫情期间疫情状况展示应用,其用户规模从0到亿级只用了1周。这看似不可能的成绩归功于云原生,业务中台,及数据中台3大红利,其中linkis+dss作为大数据开发套件,是数据中台建设的一个重要组件,在数据开发过程中如何保证各种spark,hive,flink任务 100%成功,在数据生产阶段如何规划资源调度,最大限度利用资源,保证数据准时生产出来。那云原生又会给linkis带来什么样的技术红利呢,接下来介绍linkis+dss在艾佳生活的实践。
kubernetes,isitio,knative做为云原生操作系统及各微服务生产部署的技术底座,不仅是各大云厂商的发力点,也是中小型公司弯道超车的机会。如图所示,kubernetes可以屏蔽微服务对各种计算资源(cpu,gpu,tpu),存储资源(nfs,ceph,minio)等底层资源的感知,基于DNS做服务发现,通过fqdn及可找到对应的微服务,极大屏蔽了语言等差异性(如java微服务调用go,python)。
image

istio解决连接,安全,流量控制及可观测行等问题,3个月一个版本的更新周期。istio可以降低微服务部署的复杂性,并减轻开发团队的压力,是一个开源的服务网格,作为透明的一层接入到现有的分布式应用程序里。istio也是一个平台,拥有可以集成任何日志、遥测和策略系统的 API 接口。Istio 多样化的特性使各公司能够成功且高效地运行分布式微服务架构,并提供保护、连接和监控微服务的统一方法
image

image

knative可以实现无服务化,autoscale到0特性。让开发人员关注代码的运行而不需要管理任何的基础设施。程序代码被部署在诸如AWS,阿里云等平台之上,通过事件驱动的方法去触发对函数的调用。其技术特点包括了事件驱动的调用方式,以及有一定限制的程序运行方式,如AWS Lambda的函数的运行时间默认为3秒到5分钟。从这种架构技术出现的两年多时间来看,这个技术已经有了非常广泛的应用,例如移动应用的后端和物联网应用等。简而言之,无服务器架构的出现不是为了取代传统的应用。然而,从具有高度灵活性的使用模式及事件驱动的特点出发,开发人员/架构师应该重视这个新的计算范例,它可以帮助我们达到减少部署、提高扩展性并减少代码后面的基础设施的维护负担。
image

通过容器化交付linkis微服务制品,可以省去在各个公司jar包冲突,操作系统依赖,jdk版本依赖等问题的重复解决。大家在运维各种项目的时候可能经常遇到这样的问题,我在sit环境是没有任何问题的,为什么一到生产就会有各种莫名其妙的问题,可能是因为操作系统,硬件架构差异导致,如何避免这种问题的发生,保证sit,uat,stage环境的一致性,docker其一次编译,随处运行的特性不仅可以解决上诉问题,还可以极大的节省部署时间。
kubernetes在屏蔽底层资源差异,弹性扩缩容,健康检查,故障自愈等方面会解决linkis运维过程中的很多痛点,其内建资源deployment可以保证每个微服务按期望数量运行,如果发成oom,单机宕机等问题,会在其他服务器自动重启一个pod,完成故障自愈。下图是艾佳生活linkis混合云部署部署架构图,在3月份完成右侧架构升级,通过WAF,SLB做公网流量入口,解决安全,防DDos,cc攻击等,通过SLB解决高并发流量,同region跨zone多活问题,通过pvc挂载oss存储,10个9的可靠性保证存储的高可用,6线BGP及cdn加速保证海量数据读取的时效性

image

image

最后,我们在明年会规划开发linkis-operator,解决linkis-enginemanager管理linkis-engine资源的问题,完成linkis-engine可以无上限创建。

readme文件应该更新

wds已经增加了很多新模块,建议文档也加入这些新模块的介绍和索引。

翻译领取:

下面是需要翻译的文章链接,领取翻译任务的小伙伴可以在链接后@自己,即认为该翻译任务已被领取。该翻译任务结束后,可以将该翻译任务的链接转移到已完成/Done中,并附上翻译后发布的文章链接。
https://zhuanlan.zhihu.com/p/509745572 @李扬

【有奖征文】DSS数据治理实践

一,使用背景

公司业务线较多,有景区智慧一体化管控平台的业务线,以省市县数据中心为主的数据中心业务线,以智能小程序切入的面向C端游客的业务线,面向游客的ots业务线等,目前准备构建一套可以打通各个业务线的数据中台,经过前期调研,发现微众的DSS + Linkis平台可以完美融入我们的数据中台架构,故决定搭建该平台。

二,使用功能亮点

微众DSS + Linkis打通了数据治理的闭环,数据交换-数据分析-数据使用-可视化报表展示整个链路可以满足大部分数据治理的需求,特别是scriptis脚本分析,融合了spark-sql,Hsql,scala脚本,python脚本,shell脚本,功能丰富且强大,页面风格也很清晰。脚本执行出的分析内容可以直接下载Excel或者数据报表,省去了之前很多数据导来导去的工作量。另外使用中发现一个额外功能:工作流结合调度基本可实现数据清洗的功能。

三,使用展望

希望之后可以开放数据api服务的功能,目前我们已有类似的需求,打算通过Visulis的试图暴露api出去给外部系统查询使用,可能有一定的改造工作量。

四,使用感受

对环境和版本要求较高,建议使用前可以先咨询微众开源社区人员,根据自己的spark,hadoop,hive环境和版本选择合适的DSS+Linkis编译版本。在搭建项目过程中遇到了不少困难,感谢微众小伙伴的大力支持,特别感谢杨峙岳和饶进阳开发同学的帮助,在周末休息的情况下基本上全天都在帮我解决问题,十分感谢!

【待认领】Meetup 03期活动回顾推文

活动详情:《Meetup预告:Linkis新版本介绍以及DSS的应用实践》

【需求描述】
撰写本期活动回顾的推文,文章内容需要包含以下关键要素:
1、活动主题、时间、内容简要;
2、讲师的姓名、公司、职位、分享的主题;
3、每个主题的精华概要+材料截图;

【活动素材】
1、活动视频;
2、两位讲师的PPT;
链接: https://pan.baidu.com/s/1ocrDERTFNrY64Ckk4vUboQ?pwd=pixy 提取码: pixy

【参考资料】
1、Meetup 02期回顾:Qualitis、Prophecis、DSS、Schedulis四大组件介绍
2、活动回顾 | Linkis新版本的特性介绍以及使用实践
3、【活动回顾】这场开源大数据盛会,我们把所有演讲内容浓缩成10分钟精华

【任务Tips】
1、使用飞书妙记,把视频中老师的语音转化为文字;
2、摘录老师分享的重要内容或者经典内容,再配图PPT;

完成时间:6月15日(内容提交给Andy验收)
推文时间:6月17日

如有任何疑问,请联系Andy

【有奖征文】有酒吗?上两斤二锅头

应用场景:

本人是世界XXXL大厂的xxxxxxxx...s组小组组长一名,无头衔。迫于生计,去年开始陆续出去接客,接活。
作为没见过大世面的搬砖小工头,见到客户,只会小声讲我们的产品能做数据的离线处理。没想到客户张口从叙利亚问题谈到美国总统大选,彷佛我们需要交付的特性直接关乎了世界和平。

经过几番周折和理解,基本上搞清了客户对于数据处理的主要诉求:

  • 拖拉拽。
  • 一键式。
  • 智能。
  • 安全,安全,安全生产!
  • 明天能上线吗?

本人之前是接触过hue的,但是因为是java出身,没有用python做过工程,怕表演砸了饭碗,所以直接放弃了用Hue糊弄一下的想法。
在度娘上徘徊了几圈,经过几番周折,终于从谷歌上搜到了托管在微软的github上的webank开源的linkis,最终也锁定了DSS。

解决的问题:

1 标配“拖拉拽”
拖拉拽
2 “轻松”一键式
1)界面上的一键开始,看图不解释。
开始
2)安装部署的一键式

  1. 容器化后各个服务通信问题,注册到eurka上的示例通过ip加port方式。

  2. 共同文件目录统一挂载,保证各个服务关键日志能被拿到。

  3. hive spark客户端等配置文件统一挂载至容器外,方便因环境变化的修改。

  4. 升级hive,hadoop的套件的版本,这里略去了一万字,最后结果还不错。

  5. spark回调driver的问题。

  6. 容器化以后读日志文件死锁问题。

  7. dss容器化,quality容器化,schediuls容器化....
    差点把大数据集群也搬进容器内了(捂脸).....

    折腾了小半年以后,小有收获,结果和微众银行的专家沟通后发现,开源版容器化即将发布...
    终于理解了习大大提倡的开放共享是多么重要,技术既要内循环也要外循环。

    容器化

3 语法智能提示,日志在线查看,看图不说话,用户想要的就是我们想要的。
语法提示

4 不能出安全事故啊
1)元数据安全
开源出来的版本,metadata元数据例如hive,只支持jdbc查询方式,说白了要用户名密码,被用户无情拒绝。
我们也主要应对场景也主要分三个场景:
1. 我们自己的大数据集群,可以采用jdbc方式查询元数据。
2. 适配kerberos,用HiveMetaStoreClient方式查询。
3. 适配 某为 大数据平台用的是webHcat。
代码
2)主数据安全方式
由于开源版本有租户概念,可以很好的做到用户安全访问,这里不过多描述。
Todo:引入Ranger
3)dss多系统https访问
产品没买安全证书,访问chrome还要点一下高级。dss多个系统是通过iframe嵌入的,为了不让上帝再多点一次“继续浏览不安全网址”,
我们采用通过二次代理方法解决。

ngnix配置
为了多做项目,考了安全考试,签了涉密岗位,从此出国除了考虑新冠肺炎还要考虑信息安全,做项目不容易,客户安全了,我们才能稳定。
5 明天能上线吗?
能,但是,哥,能把标书让我们先给咱写了吗,这脏活累活就交给我们干吧。

最佳实践&价值:

由于项目涉及太多,不便多说,我简单文字描述方案:
1) 客户需求: 某市政务相关项目,统计各个局办事效率的排名。
2) demo演示: 数据开发编写脚本,数据可视化生成报表,数据流组织业务,scheduls调度起来,整个流程拉通且可视化。
3) 结果:拿下项目

感受:最好的实践不一定是技术,而是能拿下项目,帮客户成功,让项目组的兄弟有肉吃,有劲干。

【有奖征文】DSS使用分享

一、使用背景
可能和其它公司不太一样,我们公司主要是以接项目开发为主,这二年数据治理、数据分析方面的项目越来越多,大公司的项目本身就有数据开发平台,我们可以直接使用,用过的有阿里的Datawork,华为的dayu,星环的,有些没有数据没有数据开发平台我们都使用这个单一的开源软件 zeppelin、cdh、dolphine、帆软等也能完成任务,这样对于做项目投入成本很高,所有部门解决自己开发一个数据平台,正好这个时候看到dss+linkis这个是国内优秀的开源项目,在这里先感谢微众的小伙伴们的贡献和一直一来的热心指导,基本可以满足我们的要求,DSS组件化的设计也是我们需要的,经过测试后,就选定在此基础上做二次开发。

二、使用情况
现阶段还处于整合阶段,DSS有些功能直接可以满足我们需求,有些功能还需要我们改造,现在的改造地方如下:
image
1)整合公司现有的系统管理平台也现实用户权限的管理。
2)整合公司现在的编目系统
3)整合dolphinscheduler调度平台
4)整合公司BI系统

三、期待的功能与改进
1、元数据管理
2、数据API共享
3、执行SQL时(初次或者一段时间后会重新启动引擎)效率有待提高
4、数据开发那边没有办法直接引用资源

再次感谢微众的小伙伴们的贡献和一直一来的热心指导。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.