分布式任务调度系统总体设计

1 架构设计

1.1 设计思想

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

1.2 系统组成

  • 调度模块(调度中心): 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块; 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
  • 执行模块(执行器): 负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等。

1.3 架构图

1571755485554.png

2 调度模块设计

2.1 调度模块

​ 批量管理子系统中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。

2.2 调度中心HA(集群)

​ 基于数据库的集群方案,数据库选用Oracle;集群分布式并发环境中进行定时任务调度时,在各个节点上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。

2.3 调度线程池

​ 调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。

2.4 并行调度

​ 批量管理子系统调度模块默认采用并行机制,在多线程调度的情况下,调度模块被阻塞的几率很低,大大提高了调度系统的承载量。

​ 批量管理子系统的每个调度任务虽然在调度模块是并行调度执行的,但是任务调度传递到任务模块的“执行器”是串行执行的,同时支持任务终止。

2.5 过期处理策略

任务调度错过触发时间时的处理策略:

  • 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
  • 处理策略:
    • 过期超5s:本地忽略,当前时间开始计算下次触发时间
    • 过期5s内:立即触发一次,当前时间开始计算下次触发时间

2.6 日志回调服务

调度模块的“调度中心”作为Web服务部署时,一方面承担调度中心功能,另一方面也为执行器提供API服务。

调度中心提供的”日志回调服务API服务”代码位置如下:

  1. easy-job-admin#com.git.job.admin.controller.JobApiController.callback

“执行器”在接收到任务执行请求后,执行任务,在执行结束之后会将执行结果回调通知“调度中心”:

2.7 任务HA(Failover)

执行器如若集群部署,调度中心支持配置多个在线的执行器,如“127.0.0.1:9997, 127.0.0.1:9998, 127.0.0.1:9999”。

当任务”路由策略”选择”故障转移(FAILOVER)”时,调度中心每次发起调度请求时,会按照顺序对执行器发出心跳检测请求,第一个检测为存活状态的执行器将会被选定并发送调度请求。

调度成功后,可在日志监控界面查看“调度备注”;

“调度备注”可以看出本地调度运行轨迹,执行器的”注册方式”、”地址列表”和任务的”路由策略”。”故障转移(FAILOVER)”路由策略下,调度中心首先对第一个地址进行心跳检测,心跳失败自动跳过,第二个依然心跳检测失败…… 直至心跳检测第三个地址“127.0.0.1:9999”成功,选定为“目标执行器”;然后对“目标执行器”发送调度请求,调度流程结束,等待执行器回调执行结果。

2.8 调度日志

调度中心每次进行任务调度,都会记录一条任务日志,任务日志主要包括以下三部分内容:

  • 任务信息:包括“执行器地址”、“JobHandler”和“执行参数”等属性,点击任务ID按钮可查看,根据这些参数,可以精确的定位任务执行的具体机器和任务代码;
  • 调度信息:包括“调度时间”、“调度结果”和“调度日志”等,根据这些参数,可以了解“调度中心”发起调度请求时具体情况。
  • 执行信息:包括“执行时间”、“执行结果”和“执行日志”等,根据这些参数,可以了解在“执行器”端任务执行的具体情况;

调度日志,针对单次调度,属性说明如下:

  • 执行器地址:任务执行的机器地址;
  • JobHandler:Bean模式表示任务执行的JobHandler名称;
  • 任务参数:任务执行的入参;
  • 调度时间:调度中心,发起调度的时间;
  • 调度结果:调度中心,发起调度的结果,SUCCESS或FAIL;
  • 调度备注:调度中心,发起调度的备注信息,如地址心跳检测日志等;
  • 执行时间:执行器,任务执行结束后回调的时间;
  • 执行结果:执行器,任务执行的结果,SUCCESS或FAIL;
  • 执行备注:执行器,任务执行的备注信息,如异常日志等;
  • 执行日志:任务执行过程中,业务代码中打印的完整执行日志;

2.9 任务依赖

原理:批量管理子系统中每个任务都对应有一个任务ID,同时,每个任务支持设置属性“子任务ID”,因此,通过“任务ID”可以匹配任务依赖关系。

当父任务执行结束并且执行成功时,将会根据“子任务ID”匹配子任务依赖,如果匹配到子任务,将会主动触发一次子任务的执行。

在任务日志界面,点击任务的“执行备注”的“查看”按钮,可以看到匹配子任务以及触发子任务执行的日志信息,如无信息则表示未触发子任务执行。

2.10 全异步化 & 轻量级

  • 全异步化设计:批量管理子系统系统中业务逻辑在远程执行器执行,触发流程全异步化设计。相比直接在调度中心内部执行业务逻辑,极大的降低了调度线程占用时间;
    • 异步调度:调度中心每次任务触发时仅发送一次调度请求,该调度请求首先推送“异步调度队列”,然后异步推送给远程执行器
    • 异步执行:执行器会将请求存入“异步执行队列”并且立即响应调度中心,异步运行。
  • 轻量级设计:批量管理子系统调度中心中每个JOB逻辑非常 “轻”,在全异步化的基础上,单个JOB一次运行平均耗时基本在 “10ms” 之内(基本为一次请求的网络开销);因此,可以保证使用有限的线程支撑大量的JOB并发运行;

实际场景中,由于调度中心与执行器网络ping延迟不同、DB读写耗时不同、任务调度密集程度不同,会导致任务量上限会上下波动。

如若需要支撑更多的任务量,可以通过 “调大调度线程数” 、”降低调度中心与执行器ping延迟” 和 “提升机器配置” 几种方式优化。

3 任务 “运行模式” 设计

3.1 “Bean模式” 任务

原理:每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。

任务类型:执行器内支持多个Bean模式任务Handler并存,可以直接配置使用,如下:

  • DefaultJobHandler:默认Java任务,继承IJobHandler,实现日终任务逻辑;
  • ShardingJobHandler:分片示例任务,任务内部模拟处理分片参数。
  • HttpJobHandler:通用HTTP任务Handler;业务方只需要提供HTTP链接即可,不限制语言、平台;

除此之外,平台支持其他任务类型的扩展,如执行服务器命令、调度存储过程、执行shell等。

3.2 执行器

执行器实际上是一个内嵌的Server,默认端口9999(配置项:easy.job.executor.port)。

在项目启动时,执行器会通过“@JobHandler”识别Spring容器中“Bean模式任务”,以注解的value属性为key管理起来。

“执行器”接收到“调度中心”的调度请求时,如果任务类型为“Bean模式”,将会匹配Spring容器中的“Bean模式任务”,然后调用其execute方法,执行任务逻辑。如果任务类型为“GLUE模式”,将会加载GLue代码,实例化Java对象,注入依赖的Spring服务(注意:Glue代码中注入的Spring服务,必须存在与该“执行器”项目的Spring容器中),然后调用execute方法,执行任务逻辑。

3.3 任务日志

批量管理子系统会为每次调度请求生成一个单独的日志文件,需要通过 “EasyJobLogger.log” 打印执行日志,“调度中心”查看执行日志时将会加载对应的日志文件。

日志文件存放的位置可在“执行器”配置文件进行自定义,默认目录格式为:/data/applogs/easy-job/jobhandler/“格式化日期”/“数据库调度日志记录的主键ID.log”。

在JobHandler中开启子线程时,子线程将会把日志打印在父线程即JobHandler的执行日志中,方便日志追踪。

4 通讯模块设计

4.1 一次完整的任务调度通讯流程

  1. - 1、“调度中心”向“执行器”发送http调度请求: “执行器”中接收请求的服务,实际上是一台内嵌Server,默认端口9999;
  2. - 2、“执行器”执行任务逻辑;
  3. - 3、“执行器”http回调“调度中心”调度结果: “调度中心”中接收回调的服务,是针对执行器开放一套API服务;

4.2 通讯数据加密

调度中心向执行器发送的调度请求时使用RequestModel和ResponseModel两个对象封装调度请求参数和响应数据, 在进行通讯之前底层会将上述两个对象对象序列化,并进行数据协议以及时间戳检验,从而达到数据加密的功能;

5 任务注册, 任务自动发现

通过任务注册和自动发现的方式, 动态获取远程执行器地址并执行。

  1. AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
  2. 注册表: "tb_job_registry"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;
  3. 执行器注册: 任务注册心跳(Beat)周期默认30s; 执行器以一倍心跳(Beat)进行执行器注册, 调度中心以一倍心跳(Beat)进行动态任务发现; 注册信息的失效时间为三倍心跳(Beat);
  4. 执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;

为保证系统”轻量级”并且降低学习部署成本,没有采用Zookeeper作为注册中心,采用DB方式进行任务注册发现;

6 任务执行结果

任务执行结果通过 “IJobHandler” 的返回值 “ReturnT” 进行判断; 当返回值符合 “ReturnT.code == ReturnT.SUCCESS_CODE” 时表示任务执行成功,否则表示任务执行失败,而且可以通过 “ReturnT.msg” 回调错误信息给调度中心; 从而,在任务逻辑中可以方便的控制任务执行结果;

7 分片广播 & 动态分片

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

“分片广播” 以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

“分片广播” 和普通任务开发流程一致,不同之处在于可以可以获取分片参数,获取分片参数进行分片业务处理。

  • Java语言任务获取分片参数方式:BEAN
  1. // 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用
  2. ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
  • 分片参数属性说明:
  1. index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
  2. total:总分片数,执行器集群的总机器数量;

该特性适用场景如:

  • 1、分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
  • 2、广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等

8 访问令牌(AccessToken)

为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;

调度中心和执行器,可通过配置项 “easy.job.accessToken” 进行AccessToken的设置。

调度中心和执行器,如果需要正常通讯,只有两种设置;

  • 设置一:调度中心和执行器,均不设置AccessToken;关闭安全性校验;
  • 设置二:调度中心和执行器,设置了相同的AccessToken;

9 调度中心API服务

调度中心提供了API服务,主要分为两种类型:

9.1 提供给执行器的API服务:

  1. 1、任务结果回调服务;
  2. 2、执行器注册服务;
  3. 3、执行器注册摘除服务;
  4. 4、触发任务单次执行服务,支持任务根据业务事件触发;

API服务位置:AdminBiz.java
API服务请求参考代码:AdminBizTest.java

9.2 提供给业务的API服务:

  1. 1、任务列表查询;
  2. 2、任务新增;
  3. 3、任务更新;
  4. 4、任务删除;
  5. 5、任务启动;
  6. 6、任务停止;
  7. 7、任务触发;

API服务位置:com.git.job.admin.controller.JobInfoController.java
API服务请求参考代码:可参考任务界面操作的ajax请求。任何ajax接口均可配置成为API服务,只需在待启用的API服务上添加 “@PermessionLimit(limit = false)” 注解取消登陆态拦截即可;

10 执行器API服务

执行器提供了API服务,供调度中心选择使用,目前提供的API服务有:

  1. 1、心跳检测:调度中心使用
  2. 2、忙碌检测:调度中心使用
  3. 3、触发任务执行:调度中心使用;本地进行任务开发时,可使用该API服务模拟触发任务;
  4. 4、获取Rolling Log:调度中心使用
  5. 5、终止任务:调度中心使用

API服务位置:ExecutorBiz
API服务请求参考代码:ExecutorBizTest

11 运行维护

11.1 故障转移 & 失败重试

一次完整任务流程包括”调度(调度中心) + 执行(执行器)”两个阶段。

  • “故障转移”发生在调度阶段,在执行器集群部署时,如果某一台执行器发生故障,该策略支持自动进行Failover切换到一台正常的执行器机器并且完成调度请求流程。
  • “失败重试”发生在”调度 + 执行”两个阶段,支持通过自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

11.2 执行器灰度上线

调度中心与业务解耦,只需部署一次后常年不需要维护。但是,执行器中托管运行着业务作业,作业上线和变更需要重启执行器,尤其是Bean模式任务。 执行器重启可能会中断运行中的任务。但是,批量管理子系统拥有自建执行器与自建注册中心,可以通过灰度上线的方式,避免因重启导致的任务中断的问题。

步骤如下:

  • 1、执行器改为手动注册,下线一半机器列表(A组),线上运行另一半机器列表(B组);
  • 2、等待A组机器任务运行结束并编译上线;执行器注册地址替换为A组;
  • 3、等待B组机器任务运行结束并编译上线;执行器注册地址替换为A组+B组; 操作结束;

11.3 任务执行结果说明

系统根据以下标准判断任务执行结果,可参考之。

执行结果 Bean任务返回值
成功 IJobHandler.SUCCESS
失败 IJobHandler.FAIL

11.4 任务超时控制

支持设置任务超时时间,任务运行超时的情况下,将会主动中断任务;

需要注意的是,任务超时中断时与任务终止机制类似,也是通过 “interrupt” 中断任务,因此业务代码需要将 “InterruptedException” 外抛,否则功能不可用。

11.5 任务失败告警

默认提供邮件失败告警,可扩展短信方式,扩展代码位置为 “JobFailMonitorHelper.failAlarm”;

11.6 避免任务重复执行

调度密集或者耗时任务可能会导致任务阻塞,集群情况下调度组件小概率情况下会重复触发; 针对上述情况,可以通过结合 “单机路由策略(如:第一台、一致性哈希)” + “阻塞策略(如:单机串行、丢弃后续调度)” 来规避,最终避免任务重复执行。

12 运行监控

运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等; 运行日志查看:系统提供日终任务执行日志查看功能,提供方便的筛选和过滤条件;