开发调度任务

开发任务逻辑

步骤

  • 1、继承”IJobHandler”:“IJobHandler”;
  • 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;
  • 3、注册到执行器工厂:添加“@JobHandler(value=”自定义jobhandler名称”)”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值;
  • 4、执行日志:需要通过 “EasyJobLogger.log” 打印执行日志;

Bean模式

  1. @JobHandler(value="demo3JobHandler")@Component
  2. public class Demo3JobHandler extends IJobHandler {
  3. @Override
  4. public ReturnT<String> execute(String param) throws Exception {
  5. EasyJobLogger.log("EASY-JOB, Hello World.");
  6. // Map map = JSONUtil.toMap(param);
  7. // String legPerCod = (String) map.get("legPerCod");
  8. // if("39051".equals(legPerCod)||"22699".equals(legPerCod)){
  9. // throw new Exception("aaaaaaaaaaaaaaaa");
  10. // }
  11. Thread.sleep(1000L*10L);
  12. return SUCCESS;
  13. }
  14. }

线程池处理任务

  1. @JobHandler(value="parallelJobHandler")@Component
  2. public class ParallelJobHandler extends IJobHandler {
  3. private static Log log = LogFactory.get();
  4. private ThreadPool pool;
  5. /**
  6. * 初始化日终任务
  7. */
  8. @Override
  9. public void init(){
  10. try {
  11. PoolHelpUtil poolHelpUtil = new PoolHelpUtilImpl();
  12. pool = ThreadPool.getInstance(poolHelpUtil);
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. /**
  18. * 执行日终任务
  19. * @param param
  20. * @return
  21. * @throws Exception
  22. */
  23. @Override
  24. public ReturnT<String> execute(String param) throws Exception {
  25. Map<String,Object> params = JSONUtil.toMap(param);
  26. String jobId = (String) params.get("jobId");
  27. /**
  28. * 待处理数据集合
  29. */
  30. List<WorkTask> dataList = new ArrayList();
  31. for (int i = 0; i < 100; i++) {
  32. //业务逻辑
  33. WorkTask task = new WorkTaskImp("Easy Etl线程池");
  34. TaskBean taskBean = new TaskBean();
  35. taskBean.setTaskId("task_"+i);
  36. taskBean.setUuid(UUID.randomUUID().toString());
  37. taskBean.setTaskName("测试任务-"+i);
  38. taskBean.setTaskNo(jobId);
  39. task.setTaskBean(taskBean);
  40. task.setTaskThreadKey("BATCH");
  41. dataList.add(task);
  42. }
  43. //把任务装入任务线程队列中
  44. pool.getTaskManager().getTaskQueue(dataList,"BATCH");
  45. //轮询判断任务是否正常完成
  46. while(true){
  47. //该任务在任务队列中的个数
  48. log.info("taskcountQueueSize---->"+pool.getTaskManager().getTaskcountQueueSize());
  49. int taskCount = pool.getTaskManager().getTaskCountQueueSize(jobId);
  50. int threadCount = 0;
  51. Vector<WorkThread> threadList = pool.getThredlist("BATCH");
  52. for (WorkThread workThread:threadList) {
  53. // log.info("线程类型:{},线程运行状态:{},线程名称:{}",workThread.getThreadKey(),workThread.getMyState(),workThread.getInfo());
  54. //当线程队列里面的线程为运行状态,且属于改任务ID的线程时
  55. if("BATCH".equals(workThread.getThreadKey()) && WorkThread.RUNSTATE.equals(workThread.getMyState()) && jobId.equals(workThread.getInfo())){
  56. threadCount ++;
  57. }else{
  58. continue;
  59. }
  60. }
  61. EasyJobLogger.log("剩余任务数【{}】,正在执行任务线程数【{}】!",taskCount,threadCount);
  62. // log.debug("剩余任务明细:{}", pool.getTaskManager().getTaskInfo());
  63. //当线程池任务队列任务数为空时,关闭线程池,返回任务成功状态
  64. if(taskCount == 0 && threadCount == 0){
  65. // pool.close();
  66. return SUCCESS;
  67. }
  68. //等待指定时间后重新检查任务是否完
  69. Thread.sleep(Integer.parseInt(params.get("poolTime")+""));
  70. }
  71. }
  72. }
  73. * 业务逻辑类 WorkTaskImp
  74. public class WorkTaskImp implements WorkTask {private static Log log = LogFactory.get();
  75. protected String param;
  76. /**
  77. * 为了显示执行线程编号
  78. */
  79. protected Object threadkey;
  80. /**
  81. * 任务执行时间
  82. */
  83. protected final int TaskExecTime = 5000;
  84. protected TaskBean taskBean;
  85. @Override
  86. public void execute() throws Exception {
  87. log.info("开始备份当日流水。。。");
  88. Thread.sleep(TaskExecTime);
  89. log.info("流水备份完成!!!");
  90. }
  91. public WorkTaskImp(String param) {
  92. this.param = param;
  93. }
  94. @Override
  95. public void setTaskThreadKey(Object key) {
  96. this.threadkey = key;
  97. }
  98. @Override
  99. public String toString() {
  100. return "【"+ taskBean.getTaskName() + "】" +param + "工作线程编号" + threadkey.toString();
  101. }
  102. @Override
  103. public Object getTaskThreadKey() {
  104. return threadkey;
  105. }
  106. @Override
  107. public TaskBean getTaskBean() {
  108. return taskBean;
  109. }
  110. @Override
  111. public void setTaskBean(TaskBean taskBean) {
  112. this.taskBean = taskBean;
  113. }
  114. }

分片广播任务

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。使用场景:

  1. 1. 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
  2. 2. 广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等
  1. @JobHandler(value="shardingJobHandler")@Service
  2. public class ShardingJobHandler extends IJobHandler {
  3. @Override
  4. public ReturnT<String> execute(String param) throws Exception {
  5. // 分片参数
  6. ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
  7. EasyJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
  8. // 业务逻辑
  9. for (int i = 0; i < shardingVO.getTotal(); i++) {
  10. if (i == shardingVO.getIndex()) {
  11. EasyJobLogger.log("第 {} 片, 命中分片开始处理", i);
  12. System.out.println("第 "+i+" 片, 命中分片开始处理");
  13. } else {
  14. EasyJobLogger.log("第 {} 片, 忽略", i);
  15. System.out.println("第 "+i+" 片, 忽略");
  16. }
  17. }
  18. return SUCCESS;
  19. }
  20. }

开发kettle任务

使用kettle的Spoon工具开发功能

看菜单 第四章之spoon工具使用教程

子系统存放kettle文件位置

Image111.png 将开发完成的功能的文件,存储如下位置。 以easyloan-classify为例: D:\IdeaPOCWork\easyloan-classify\classify-springboot\src\main\resources\kettle在Linux系统上打包部署时,会将kettle目录拷贝到服务器app/tomcat/scms/easyloan/下。

kettleJobHandler调度执行器

该调度任务在easy-job-core jar包当中,子系统依赖easy-job-core,就拥有了在调度中心配置kettle执行的调度任务

  1. /*** Kettle任务Handler示例(Bean模式)
  2. *
  3. * 开发步骤:
  4. * 1、继承"IJobHandler":“IJobHandler”;
  5. * 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;
  6. * 3、注册到执行器工厂:添加“@JobHandler(value="自定义jobhandler名称")”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值。
  7. * 4、执行日志:需要通过 "EasyJobLogger.log" 打印执行日志;
  8. *
  9. * @author git 2015-12-19 19:43:36
  10. */
  11. @JobHandler(value="kettleJobHandler")
  12. @Component
  13. public class KettleJobHandler extends IJobHandler {
  14. @Override
  15. public ReturnT<String> execute(String param) throws Exception {
  16. EasyJobLogger.log("EASY-JOB,Kettle批量任务开始.{}", DateUtil.now());
  17. Map<String, Object> params = JSONUtil.toMap(param);
  18. String kettleFile = (String) params.get("kettleName");
  19. EasyJobLogger.log("开始执行kettle任务:{}", params.get("kettleName"));
  20. EasyJobLogger.log("执行脚本参数:{}",param);
  21. String batDate = (String) params.get("batDate");//批量参数
  22. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
  23. Date curDate = sdf.parse(batDate);//批量日期
  24. Date curDateB = com.git.job.core.core.util.DateUtil.addDays(curDate,-1);//批量前一日
  25. Date curDateN = com.git.job.core.core.util.DateUtil.addDays(curDate,1);//批量下一日
  26. Date curDateBM = com.git.job.core.core.util.DateUtil.addMonths(curDate,-1);//跑批前一月
  27. Date curDateBME = DateUtil.endOfMonth(curDateBM);//跑批前一月最后一天
  28. Date curDateMB = DateUtil.beginOfMonth(curDate);//跑批当月第一天
  29. Date curDateME = DateUtil.endOfMonth(curDate);//跑批当月最后一天
  30. Date curDateFB = com.git.job.core.core.util.DateUtil.addMonths(curDate,-3);//跑批前一季
  31. String curDateFB_Mtemp = DateUtil.format(curDateFB,"MM");
  32. String curDateFB_Ytemp = DateUtil.format(curDateFB,"yyyy");
  33. String curDateFB_YM = null;//QUA_BEF_DATE 跑批前一季 20193
  34. if(curDateFB_Mtemp.equals("01") || curDateFB_Mtemp.equals("02") || curDateFB_Mtemp.equals("03")){
  35. curDateFB_YM = curDateFB_Ytemp + "1";
  36. }else if(curDateFB_Mtemp.equals("04") || curDateFB_Mtemp.equals("05") || curDateFB_Mtemp.equals("06")){
  37. curDateFB_YM = curDateFB_Ytemp + "2";
  38. }else if(curDateFB_Mtemp.equals("07") || curDateFB_Mtemp.equals("08") || curDateFB_Mtemp.equals("09")){
  39. curDateFB_YM = curDateFB_Ytemp + "3";
  40. }else if(curDateFB_Mtemp.equals("10") || curDateFB_Mtemp.equals("11") || curDateFB_Mtemp.equals("12")){
  41. curDateFB_YM = curDateFB_Ytemp + "4";
  42. }
  43. String curDateFN_Mtemp = DateUtil.format(curDate,"MM");
  44. String curDateFN_Ytemp = DateUtil.format(curDate,"yyyy");
  45. String curDateFN_YM = null;//QUA_DATE 跑批当季 20194
  46. String curDateFN_YM_FM = null;//QUA_DATE_FIR_MON 跑批当季第一月 201910
  47. String curDateFN_YM_EM = null;///QUA_DATE_LAS_MON 跑批当季最后一月 201912
  48. String curDateHN = null;//YEA_HAF_DATE 跑批半年 2019B
  49. String curDateHN_FM = null;///YEA_HAF_FIR_MON 跑批半年第一个月 201907
  50. if(curDateFN_Mtemp.equals("01") || curDateFN_Mtemp.equals("02") || curDateFN_Mtemp.equals("03")){
  51. curDateFN_YM = curDateFN_Ytemp + "1";
  52. curDateFN_YM_FM = curDateFN_Ytemp + "01";
  53. curDateFN_YM_EM = curDateFN_Ytemp + "03";
  54. curDateHN = curDateFN_Ytemp + "A";
  55. curDateHN_FM = curDateFN_Ytemp + "01";
  56. }else if(curDateFN_Mtemp.equals("04") || curDateFN_Mtemp.equals("05") || curDateFN_Mtemp.equals("06")){
  57. curDateFN_YM = curDateFN_Ytemp + "2";
  58. curDateFN_YM_FM = curDateFN_Ytemp + "04";
  59. curDateFN_YM_EM = curDateFN_Ytemp + "06";
  60. curDateHN = curDateFN_Ytemp + "A";
  61. curDateHN_FM = curDateFN_Ytemp + "01";
  62. }else if(curDateFN_Mtemp.equals("07") || curDateFN_Mtemp.equals("08") || curDateFN_Mtemp.equals("09")){
  63. curDateFN_YM = curDateFN_Ytemp + "3";
  64. curDateFN_YM_FM = curDateFN_Ytemp + "07";
  65. curDateFN_YM_EM = curDateFN_Ytemp + "09";
  66. curDateHN = curDateFN_Ytemp + "B";
  67. curDateHN_FM = curDateFN_Ytemp + "07";
  68. }else if(curDateFN_Mtemp.equals("10") || curDateFN_Mtemp.equals("11") || curDateFN_Mtemp.equals("12")){
  69. curDateFN_YM = curDateFN_Ytemp + "4";
  70. curDateFN_YM_FM = curDateFN_Ytemp + "10";
  71. curDateFN_YM_EM = curDateFN_Ytemp + "12";
  72. curDateHN = curDateFN_Ytemp + "B";
  73. curDateHN_FM = curDateFN_Ytemp + "07";
  74. }
  75. Date curDateYB = com.git.job.core.core.util.DateUtil.addYears(curDate,-1);//跑批前一年
  76. String curDateYB_Y = DateUtil.format(curDateYB,"yyyy");//跑批前一年
  77. String curDateY_Y = DateUtil.format(curDate,"yyyy");//跑批前一年
  78. params.put("DAYDATE",batDate); //日期参数
  79. params.put("DAY_BEF_DATE",DateUtil.format(curDateB,"yyyyMMdd"));//跑批前一天
  80. params.put("DAY_DATE",batDate);//跑批当天
  81. params.put("MON_BEF_DATE",DateUtil.format(curDateBM,"yyyyMM")); //跑批前一月
  82. params.put("MON_BEF_LAS_DATE",DateUtil.format(curDateBME,"yyyyMMdd"));//跑批前一月最后一天
  83. params.put("MON_DATE",DateUtil.format(curDate,"yyyyMM"));//跑批当月
  84. params.put("MON_DATE_FIR_DAY",DateUtil.format(curDateMB,"yyyyMMdd"));//跑批当月第一天
  85. params.put("MON_DATE_LAS_DAY",DateUtil.format(curDateME,"yyyyMMdd"));//跑批当月最后一天
  86. params.put("QUA_BEF_DATE",curDateFB_YM);//跑批前一季
  87. params.put("QUA_DATE",curDateFN_YM);//跑批当季
  88. params.put("QUA_DATE_FIR_MON",curDateFN_YM_FM);//跑批当季第一月
  89. params.put("QUA_DATE_LAS_MON",curDateFN_YM_EM);//跑批当季最后一月
  90. params.put("YEA_HAF_DATE",curDateHN);//跑批半年
  91. params.put("YEA_HAF_FIR_MON",curDateHN_FM);//跑批半年第一个月
  92. params.put("YER_BEF_DATE",curDateYB_Y);//跑批前一年
  93. params.put("YER_DATE",curDateY_Y);//跑批当年
  94. params.put("YER_DATE_FIR_DAY",curDateY_Y + "0101");//跑批当年第一天
  95. params.put("YER_DATE_FIR_MON",curDateY_Y + "01");//跑批当年第一月
  96. params.put("YER_DATE_LAS_DAY",curDateY_Y + "1231");//跑批当年最后一天
  97. params.put("YER_DATE_LAS_MON",curDateY_Y + "12");//跑批当年最后一月
  98. //modify begin by gh @date: 2020/7/15 跑批 星期 1到7对应周日到周六
  99. params.put("WEEK",String.valueOf(DateUtil.dayOfWeek(curDate)));//跑批 星期 1到7对应周日到周六
  100. //modify end by gh @date: 2020/7/15
  101. String retMsg = "0000:任务执行成功!";
  102. for(String key : params.keySet()){
  103. EasyJobLogger.log("key="+key+"and value="+params.get(key));
  104. }
  105. KettleUtilsImp kettleUtilsImp = new KettleUtilsImp();
  106. if(KettleTools.isJob(kettleFile)){
  107. retMsg = kettleUtilsImp.callNativeJob(kettleFile,params);
  108. }else if (KettleTools.isKtr(kettleFile)){
  109. retMsg = kettleUtilsImp.callNativeTrans(kettleFile,params);
  110. }else{
  111. throw new IOException("不支持的kettle脚本类型!");
  112. }
  113. if(!"0000:任务执行成功!".equals(retMsg)){
  114. throw new IOException(retMsg + "kettle执行失败!");
  115. }
  116. EasyJobLogger.log("kettle日志"+retMsg,DateUtil.now());
  117. EasyJobLogger.log("EASY-JOB,Kettle批量任务结束.{}", DateUtil.now());
  118. return SUCCESS;
  119. }
  120. }

注意: 在kettle中可以执行中可以直接使用许多日期变量。kettleJobHandler调用kettle时,会传入参数。

  1. params.put("DAYDATE",batDate); //日期参数 params.put("DAY_BEF_DATE",DateUtil.format(curDateB,"yyyyMMdd"));//跑批前一天 params.put("DAY_DATE",batDate);//跑批当天 params.put("MON_BEF_DATE",DateUtil.format(curDateBM,"yyyyMM")); //跑批前一月 params.put("MON_BEF_LAS_DATE",DateUtil.format(curDateBME,"yyyyMMdd"));//跑批前一月最后一天 params.put("MON_DATE",DateUtil.format(curDate,"yyyyMM"));//跑批当月 params.put("MON_DATE_FIR_DAY",DateUtil.format(curDateMB,"yyyyMMdd"));//跑批当月第一天 params.put("MON_DATE_LAS_DAY",DateUtil.format(curDateME,"yyyyMMdd"));//跑批当月最后一天 params.put("QUA_BEF_DATE",curDateFB_YM);//跑批前一季 params.put("QUA_DATE",curDateFN_YM);//跑批当季 params.put("QUA_DATE_FIR_MON",curDateFN_YM_FM);//跑批当季第一月 params.put("QUA_DATE_LAS_MON",curDateFN_YM_EM);//跑批当季最后一月 params.put("YEA_HAF_DATE",curDateHN);//跑批半年 params.put("YEA_HAF_FIR_MON",curDateHN_FM);//跑批半年第一个月 params.put("YER_BEF_DATE",curDateYB_Y);//跑批前一年 params.put("YER_DATE",curDateY_Y);//跑批当年 params.put("YER_DATE_FIR_DAY",curDateY_Y + "0101");//跑批当年第一天 params.put("YER_DATE_FIR_MON",curDateY_Y + "01");//跑批当年第一月 params.put("YER_DATE_LAS_DAY",curDateY_Y + "1231");//跑批当年最后一天 params.put("YER_DATE_LAS_MON",curDateY_Y + "12");//跑批当年最后一月 params.put("WEEK",String.valueOf(DateUtil.dayOfWeek(curDate)));//跑批 星期 1到7对应周日到周六

配置调度中心任务

  1. 1. 查询tb_job_group 确认执行器id
  2. 2. 确定任务在日终任务当中位置。触发任务,依赖任务,子任务。
  3. 3. 插入tb_job_task
  4. * id 字段 主键id 由系统名+分组+排序组成,后面必须是数据字结尾。
  5. * JOB_GROUP 字段 对应执行器id
  6. * executor_handler 字段,值 kettleJobHandler
  7. * executor_param 字段,传入kettleJobHandler调度中的执行,传入是一个json格式,必须包含一个kettleName值,指kettle绝对路径。

例如:

  1. {"kettleName":"/app/tomcat/scms/easyloan/kettle/dataApplication/TB_CSM_GRO_COM.ktr"}

可以加入其他信息传入 kettleJobHandler。 例如:

  1. {"kettleName":"/app/tomcat/scms/easyloan/kettle/dataApplication/TB_CSM_GRO_COM.ktr","SQLPathKuanBiao":"/app/tomcat/scms/easyloan/kettle/dataApplication","SQLPathApy":"/app/tomcat/scms/easyloan/kettle/dataApplication","SQLPathReport":"/app/tomcat/scms/easyloan/kettle/dataApplication","CsmFilePath":"/app/scms/datafile/send/ecif"}
  1. * job_cron 字段值默认 0/1 * * * * ? * 意思是 1秒触发;Linux|014|cron表达式详解
  2. * child_jobid 字段, 触发任务id,多个任务用英文 逗号隔开“,”。
  3. * dependency_id 字段,依赖任务,多个任务用英文 逗号隔开“,”。当父任务执行完后,触发子任务时,检查依赖任务是否执行成功。所有依赖执行成功才能执行当前任务。