Giter Site home page Giter Site logo

apache / shardingsphere-elasticjob Goto Github PK

View Code? Open in Web Editor NEW
8.1K 636.0 3.3K 44.79 MB

Distributed scheduled job

License: Apache License 2.0

Java 99.89% Batchfile 0.06% Shell 0.05%
scheduled-jobs cron job job-management quartz elasticjob database shard middleware

shardingsphere-elasticjob's Introduction

Official website: https://shardingsphere.apache.org/elasticjob/

Stargazers over time

Through the functions of flexible scheduling, resource management and job management, it creates a distributed scheduling solution suitable for Internet scenarios, and provides a diversified job ecosystem through open architecture design. It uses a unified job API for each project. Developers only need code one time and can deploy at will.

ElasticJob became an Apache ShardingSphere Sub-project on May 28 2020.

You are welcome to communicate with the community via the mailing list.

License

GitHub release

Maven Status Build Status GitHub Workflow codecov Maintainability

Introduction

Using ElasticJob developers can no longer worry about the non functional requirements such as job scale out, so that they can focus more on business coding. At the same time, it can release operators too, so that they do not have to worry about high availability and management, and can automatically operate by simply adding servers.

It is a lightweight, decentralized solution that provides distributed task sharding services.

ElasticJob Architecture

Features

  • Elastic Schedule

    • Support job sharding and high availability in distributed system
    • Scale out for throughput and efficiency improvement
    • Job processing capacity is flexible and scalable with the allocation of resources
  • Resource Assign

    • Execute job on suitable time and assigned resources
    • Aggregation same job to same job executor
    • Append resources to newly assigned jobs dynamically
  • Job Governance

    • Failover
    • Misfired
    • Self diagnose and recover when distribute environment unstable
  • Job Dependency (TODO)

    • DAG based job dependency
    • DAG based job item dependency
  • Job Open Ecosystem

    • Unify job api for extension
    • Support rich job type lib, such as dataflow, script, HTTP, file, big data
    • Focus business SDK, can work with Spring IOC
  • Admin Console

    • Job administration
    • Job event trace query
    • Registry center management

Environment Required

Java

Java 8 or above required.

Maven

Maven 3.5.0 or above required.

ZooKeeper

ZooKeeper 3.6.0 or above required. See details

shardingsphere-elasticjob's People

Contributors

cocodroid avatar dependabot[bot] avatar dongzl avatar fushun620 avatar guangyun1013 avatar gzdzss avatar hanahmily avatar haocao avatar jiang2015 avatar kekerzzz avatar kimmking avatar linghengqian avatar luky116 avatar menghaoranss avatar nmyphp avatar seanwan1989 avatar strongduanmu avatar sunkai-cai avatar taojintianxia avatar technoboy- avatar terrymanu avatar teslacn avatar tristazero avatar tuohai666 avatar viviel avatar wgy8283335 avatar wubingting avatar wwj-go avatar zhangxinguo avatar zhaoyuguang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

shardingsphere-elasticjob's Issues

类找不到

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/curator/framework/recipes/cache/TreeCache

找不到这个类 是不是依赖的curator版本有问题

任务在控制台暂停之后,无法恢复运行

问题:
1、控制台暂停任务之后,点击恢复,本次任务无法继续运行
2、控制台暂停任务之后,重设任务执行时间,也无法再次运行
原因:
AbstractElasticJob中的stop属性一旦被修改为true之后,没有在任务状态发生变化时,同步变化

复杂网络环境下IP地址获取不准确的问题

问题发生位置
工程:elastic-job-core,类:com.dangdang.ddframe.job.internal.env.RealLocalHostService,方法:getLocalHost().
Uploading 1.jpg…

说明:对于一般性网络环境,代码段:InetAddress.getLocalHost();,可以正确的获取到本机的ip。
对于复杂的企业环境,企业网络管理员可能会进行部门子网、小组子网的划分,并最终链入企业主路由;
这样一台员工主机可能会有多个内网ip,代码段:InetAddress.getLocalHost();默认会返回当前主机链接到最近网络拓扑路径的路由所分配的子网ip。

综上,代码段:InetAddress.getLocalHost();得到的ip可能与实际链接到zookeeper的ip出现不一致。
可以想象的到,错误的将子网路由ip作为链接到zookeeper的ip,会导致同子网下,不同机器链接zookkeeper出现判断错误。

建议修改代码:https://github.com/amao12580/RSSReader/blob/master/core.base/src/main/java/per/rss/core/base/util/IPUtils.java

执行fetchData有数据,但是没有执行processData方法

ej版本 1.0.2

日志

第一次查询到复核条件的数据
[03-01 19:29:00,126] INFO  com.qianqian.elasticjob.ActCheckTimer.fetchData(ActCheckTimer.java:43): 活动审核中定时任务开始...... 分片序列号--0--FetchDatSize--1
............
[03-01 19:53:00,119] INFO  com.qianqian.elasticjob.ActCheckTimer.fetchData(ActCheckTimer.java:43): 活动审核中定时任务开始...... 分片序列号--0--FetchDatSize--1
[03-01 19:54:00,111] INFO  com.qianqian.elasticjob.ActCheckTimer.fetchData(ActCheckTimer.java:43): 活动审核中定时任务开始...... 分片序列号--0--FetchDatSize--1
............
重启定时器
[03-01 19:55:00,830] INFO  com.qianqian.elasticjob.ActCheckTimer.fetchData(ActCheckTimer.java:43): 活动审核中定时任务开始...... 分片序列号--0--FetchDatSize--1
[03-01 19:55:00,840] DEBUG com.framelib.aop.DBSourceAspect.doBefore(DBSourceAspect.java:36): switch dataSource to MINE_01 success
[03-01 19:55:00,850] DEBUG org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366): Creating new transaction with name [com.qianqian.service.impl.PrdServiceImpl.modifyActCheck]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT,-java.lang.Exception
[03-01 19:55:00,851] DEBUG org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:204): Acquired Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d] for JDBC transaction
[03-01 19:55:00,867] DEBUG org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:221): Switching JDBC Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d] to manual commit
[03-01 19:55:00,877] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ooo Using Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d]
[03-01 19:55:00,877] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ==>  Preparing: UPDATE prd_product_activity t SET t.status = ? WHERE t.id = ? AND t.end_status = 0 
[03-01 19:55:00,879] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ==> Parameters: 23(Integer), 3482933011006518(Long)
[03-01 19:55:01,20] DEBUG org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753): Initiating transaction commit
[03-01 19:55:01,20] DEBUG org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:264): Committing JDBC transaction on Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d]
[03-01 19:55:01,25] DEBUG org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:322): Releasing JDBC Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d] after transaction
[03-01 19:55:01,25] DEBUG org.springframework.jdbc.datasource.DataSourceUtils.doReleaseConnection(DataSourceUtils.java:332): Returning JDBC Connection to DataSource
[03-01 19:55:01,25] DEBUG com.framelib.aop.DBSourceAspect.doAfter(DBSourceAspect.java:53): switch dataSource to CLUSTER_01 success
正确执行任务
[03-01 19:55:01,28] INFO  com.qianqian.elasticjob.ActCheckTimer.processData(ActCheckTimer.java:55): 活动审核中任务执行...... 分片序列号--0--活动ID--3482933011006518
[03-01 19:55:01,33] DEBUG com.framelib.aop.DBSourceAspect.doBefore(DBSourceAspect.java:36): switch dataSource to MINE_01 success
[03-01 19:55:01,39] DEBUG org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:110): Fetching JDBC Connection from DataSource
[03-01 19:55:01,39] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ooo Using Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@de91d1d]
[03-01 19:55:01,40] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ==>  Preparing: SELECT t1.`type`,t1.`status` AS prdStatus,t1.`product_code`,t1.`version`,t2.`id` AS actId,t2.`on_time` FROM prd_product t1,prd_product_activity t2 WHERE t1.`product_code` = t2.`product_code` AND t1.`version` = t2.`version` AND t1.`show_flag` = 1 AND t2.`status` = 22 AND t2.`on_time` <= DATE_ADD(DATE_SUB(NOW(),INTERVAL '7' DAY),INTERVAL '00:01:00' HOUR_SECOND) AND t2.id%?=? limit ? 
[03-01 19:55:01,41] DEBUG org.apache.ibatis.logging.slf4j.Slf4jImpl.debug(Slf4jImpl.java:47): ==> Parameters: 1(Integer), 0(Integer), 40(Integer)
[03-01 19:55:01,46] DEBUG org.springframework.jdbc.datasource.DataSourceUtils.doReleaseConnection(DataSourceUtils.java:332): Returning JDBC Connection to DataSource
[03-01 19:55:01,46] DEBUG com.framelib.aop.DBSourceAspect.doAfter(DBSourceAspect.java:53): switch dataSource to CLUSTER_01 success
[03-01 19:55:01,47] INFO  com.qianqian.elasticjob.ActCheckTimer.fetchData(ActCheckTimer.java:43): 活动审核中定时任务开始...... 分片序列号--0--FetchDatSize--0

代码

@Override
    public List<ProductTimer> fetchData(JobExecutionSingleShardingContext shardingContext) {
        List<ProductTimer> timers = prdService.selectActCheck(getDefaultParaMap(shardingContext));
        log.info("活动审核中定时任务开始...... 分片序列号--"+shardingContext.getShardingItem()+"--FetchDatSize--"+getFetchDatSize(timers));
        return timers;
    }

    @Override
    public boolean processData(JobExecutionSingleShardingContext shardingContext, ProductTimer prdTimer) {
        try {
            prdService.modifyActCheck(prdTimer);
        } catch (Exception e) {
            log.error("活动审核中任务执行异常...... 分片序列号--"+shardingContext.getShardingItem()+"--活动ID--"+prdTimer.getActId(),e);
            return false;
        }
        log.info("活动审核中任务执行...... 分片序列号--"+shardingContext.getShardingItem()+"--活动ID--"+prdTimer.getActId());
        return true;
    }

    @Override
    public boolean isStreamingProcess() {

        return true;
    }

配置信息

     <job:bean id="product_checkJob" class="com.qianqian.elasticjob.ActCheckTimer" 
        regCenter="regCenter" 
        cron="0 0/1 * * * ?"
        fetchDataCount="40"
        shardingTotalCount="1"
        shardingItemParameters="0=A"
        overwrite="true"
        description="活动审核中"/>

可于DataFlow类型作业定制化线程池配置

之前DataFlowElasticJob由于使用newCachedThreadPool,可能会导致线程过多冲垮线程池。

现于DataFlowElasticJob接口增加getExecutorService方法,使用方可通过覆盖该方法自行提供ExecutorService,默认为newCachedThreadPool。

任务不能保证全局成功

节点只能统计或保证执行分片的事务性,没办法在全局上做一个控制,可能出现部分数据操作失败,建议在每个分片执行前和全部分片完成时检查zk中是否有失败节点,如果有,就回调任务中roolback接口将节点前面做过的分片任务进行回滚。建议在任务上加上roolback相关回调

分片和主节点选举同时发生时,死锁问题解决

分片和主节点选举同时发生时,可能发生死锁。

  1. 通过修正分片中条件判断改善多线程环境下分片锁死的可能性。
  2. 将分配分片步骤和清理分片标记步骤放入同一事务,保持数据完整性,并防止分片成功后分片标记未清理导致的死锁可能性。
  3. 将清理分片步骤放入同一事务,保持数据完整性。

节点间任务可能出现较大延迟

节点任务由quartz进行调度,但是如果任务节点和主节点或其它节点的时间有大量出入,就会导致较快节点一直阻塞等待主节点分片,较慢节点一直没去执行分片任务。在数据要进行全局一致性保证有的情况下容易造成事务超时或不能实现。建议在节点启动时增加时间同步,比如以主节点数据为基准,如果差距超出某一范围,则进行时间同步

增加前置和后置任务

有些任务,比如系统间定期数据同步,需要将已有数据清空,然后将新数据导入到系统中,并且前置任务只执行一次(执行前j环境准备)。目前没有相关方法做此工作,建议在任务执行主节点进行任务分配之前或之后由主节点执行相应回调,如init.

提供根据作业名称hash值取奇偶数分片排序策略

提供com.dangdang.ddframe.job.plugin.sharding.strategy.OdevitySortByNameJobShardingStrategy分片策略。
根据作业名的哈希值奇偶数决定IP升降序的算法分片策略。

作业名的哈希值为奇数则IP升序。
作业名的哈希值为偶数则IP降序。
用于不同的作业平均分配负载至不同的服务器。
如:

  1. 如果有3台服务器,分成2片,作业名称的哈希值为奇数, 则每台服务器分到的分片是:1=[0], 2=[1], 3=[]
  2. 如果有3台服务器,分成2片,作业名称的哈希值为偶数, 则每台服务器分到的分片是:3=[0], 2=[1], 1=[]

当设置isMonitorExecution=true后,等待积压任务应该能够设置timeout

当设置isMonitorExecution=true后:
while (executionService.hasRunningItems()) {
log.debug("Elastic job: sleep short time until other job completed.");
BlockUtils.waitingShortTime();
}
当任务由于某些原因阻塞的话,后面的任务仍然会依次启动并进入等待队列,最坏的情况可能出现stackOverFlowException。
应当提供可选配置等待timeout,达到后退出while循环并报警。

任务抛出运行时异常后,不会继续执行

你好,我在试用中发现了几个问题:

1、如果任务抛出运行时异常后,任务就不会再继续运行了。控制台也监控不了,状态一直是RUNNING。

2、job描述好像不支持中文

3、点击作业更新按钮没反应(chrome 最新版本)

4、控制台如果能看到作业的cron表达式就更好了

使用类似disruptor的ringBuffer的生产/消费者模式来处理dataFlow提升效率

目前dataFlow的处理是阻塞队列模式
loadData->
->DataExecutionThread1
->DataExecutionThread2
->DataExecutionThread3
->...
waiting for execution complete
...
next round loadData->
->DataExecutionThread1
->DataExecutionThread2
->DataExecutionThread3
->...

这种阻塞读的模式在快速批量数据处理中性能不够好,应该可选为生产/消费模式

设置unscheduleAllTriggers=true后挂起任务在监控里没有体现

@Override
public void handleJobExecutionException(JobExecutionException jobExecutionException) throws JobExecutionException {
    jobExecutionException.setUnscheduleAllTriggers(true);
    throw jobExecutionException;
}

当拦截异常后,trigger已经挂起,但监控里没有体现这种状态。

期望结果:
当设置unscheduleAllTriggers=true后,如果trigger被挂起,能同时在console中显示状态为exception

校对作业服务器与注册中心时间误差

多机运行job时,job在各自所在机器上,由quartz组件单机调度。
quartz组件是强时间依赖的组件,时间依赖于本机的操作系统时间。
如果机器之间的操作系统时间出现不一致,而job的运行与时间存在逻辑关系,则会引发job运行混乱。

建议:每台机器启动运行实例时,如果有job配置为强时间依赖,则与zookeeper进行时间校对,如果差异过大,抛出启动异常,中断启动。

其实只要主机间安装ntp组件,保持运行,并实时链入时间同步服务器,也可以解决这个问题。

强时间依赖的job示例:
某job,名为A,A的每次运行,需要检查最后一次运行的时间与当前时间的差值,是否低于30秒,如果低于则退出运行,反之则继续运行。
如果A部署在2台机器上,这两台机器的操作系统时间存在差异,差值为20分钟。则A在分布式运行在2台机器时,会出现问题。

增加批量处理功能导致的作业类型接口变更

便于分类清晰,将com.dangdang.ddframe.job.plugin.job.type.AbstractSimpleElasticJob移动至com.dangdang.ddframe.job.plugin.job.type.simple.AbstractSimpleElasticJob,数据流作业类型移动至com.dangdang.ddframe.job.plugin.job.type.dataflow包

由于功能提升导致之前的数据流作业接口由2个变为4个,原AbstractThroughputDataFlowElasticJob和AbstractSequenceDataFlowElasticJob改变为AbstractIndividualThroughputDataFlowElasticJob、AbstractIndividualSequenceDataFlowElasticJob,并增加2种作业类型:AbstractBatchThroughputDataFlowElasticJob、AbstractBatchSequenceDataFlowElasticJob

ArrayIndexOutOfBoundsException thrown from RotateServerByNameJobShardingStrategy

线上环境使用时,发现某些采用RotateServerByNameJobShardingStrategy分片策略的job会抛出ArrayIndexOutOfBoundsException异常。原因是因为RotateServerByNameJobShardingStrategy Line-26调用jobName.hashcode()时可能返回负值,导致数组越界。

org.quartz.SchedulerException: Job threw an unhandled exception.
at org.quartz.core.JobRunShell.run(JobRunShell.java:213) ~[org.quartz-scheduler.quartz-2.2.1.jar:na]
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [org.quartz-scheduler.quartz-2.2.1.jar:na]
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:418) ~[na:1.8.0_60]
at java.util.ArrayList.get(ArrayList.java:431) ~[na:1.8.0_60]
at com.dangdang.ddframe.job.plugin.sharding.strategy.RotateServerByNameJobShardingStrategy.rotateServerList(RotateServerByNameJobShardingStrategy.java:33) ~[com.dangdang.elastic-job-core-1.0.2.jar:na]
at com.dangdang.ddframe.job.plugin.sharding.strategy.RotateServerByNameJobShardingStrategy.sharding(RotateServerByNameJobShardingStrategy.java:21) ~[com.dangdang.elastic-job-core-1.0.2.jar:na]
at com.dangdang.ddframe.job.internal.sharding.ShardingService.shardingIfNecessary(ShardingService.java:107) ~[com.dangdang.elastic-job-core-1.0.2.jar:na]
at com.dangdang.ddframe.job.internal.job.AbstractElasticJob.execute(AbstractElasticJob.java:73) ~[com.dangdang.elastic-job-core-1.0.2.jar:na]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ~[org.quartz-scheduler.quartz-2.2.1.jar:na]

FailoverListenerManager 添加监听事件

如果分片任务有failover记录,当分片任务重新跑,状态由running->completed 的场合,针对添加/testJob/execution/0/completed 事件,是否需要删除针对分片的失效节点记录,比如:/testJob/execution/0/failover。

分片策略可配置化

将默认使用的按照IP排序的分片策略,改为可配置化,以后可定制化分片策略并通过配置实现不同策略

增加job监控及报警接口

友好定义出 报警接口 ,。 至于怎么报警抽象接口又各个业务自己实现。! 把如何监控详细定义出来

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.