开发调度任务
开发任务逻辑
步骤
- 1、继承”IJobHandler”:“IJobHandler”;
- 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;
- 3、注册到执行器工厂:添加“@JobHandler(value=”自定义jobhandler名称”)”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值;
- 4、执行日志:需要通过 “EasyJobLogger.log” 打印执行日志;
Bean模式
@JobHandler(value="demo3JobHandler")@Componentpublic class Demo3JobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {EasyJobLogger.log("EASY-JOB, Hello World.");// Map map = JSONUtil.toMap(param);// String legPerCod = (String) map.get("legPerCod");// if("39051".equals(legPerCod)||"22699".equals(legPerCod)){// throw new Exception("aaaaaaaaaaaaaaaa");// }Thread.sleep(1000L*10L);return SUCCESS;}}
线程池处理任务
@JobHandler(value="parallelJobHandler")@Componentpublic class ParallelJobHandler extends IJobHandler {private static Log log = LogFactory.get();private ThreadPool pool;/*** 初始化日终任务*/@Overridepublic void init(){try {PoolHelpUtil poolHelpUtil = new PoolHelpUtilImpl();pool = ThreadPool.getInstance(poolHelpUtil);} catch (Exception e) {e.printStackTrace();}}/*** 执行日终任务* @param param* @return* @throws Exception*/@Overridepublic ReturnT<String> execute(String param) throws Exception {Map<String,Object> params = JSONUtil.toMap(param);String jobId = (String) params.get("jobId");/*** 待处理数据集合*/List<WorkTask> dataList = new ArrayList();for (int i = 0; i < 100; i++) {//业务逻辑WorkTask task = new WorkTaskImp("Easy Etl线程池");TaskBean taskBean = new TaskBean();taskBean.setTaskId("task_"+i);taskBean.setUuid(UUID.randomUUID().toString());taskBean.setTaskName("测试任务-"+i);taskBean.setTaskNo(jobId);task.setTaskBean(taskBean);task.setTaskThreadKey("BATCH");dataList.add(task);}//把任务装入任务线程队列中pool.getTaskManager().getTaskQueue(dataList,"BATCH");//轮询判断任务是否正常完成while(true){//该任务在任务队列中的个数log.info("taskcountQueueSize---->"+pool.getTaskManager().getTaskcountQueueSize());int taskCount = pool.getTaskManager().getTaskCountQueueSize(jobId);int threadCount = 0;Vector<WorkThread> threadList = pool.getThredlist("BATCH");for (WorkThread workThread:threadList) {// log.info("线程类型:{},线程运行状态:{},线程名称:{}",workThread.getThreadKey(),workThread.getMyState(),workThread.getInfo());//当线程队列里面的线程为运行状态,且属于改任务ID的线程时if("BATCH".equals(workThread.getThreadKey()) && WorkThread.RUNSTATE.equals(workThread.getMyState()) && jobId.equals(workThread.getInfo())){threadCount ++;}else{continue;}}EasyJobLogger.log("剩余任务数【{}】,正在执行任务线程数【{}】!",taskCount,threadCount);// log.debug("剩余任务明细:{}", pool.getTaskManager().getTaskInfo());//当线程池任务队列任务数为空时,关闭线程池,返回任务成功状态if(taskCount == 0 && threadCount == 0){// pool.close();return SUCCESS;}//等待指定时间后重新检查任务是否完Thread.sleep(Integer.parseInt(params.get("poolTime")+""));}}}* 业务逻辑类 WorkTaskImppublic class WorkTaskImp implements WorkTask {private static Log log = LogFactory.get();protected String param;/*** 为了显示执行线程编号*/protected Object threadkey;/*** 任务执行时间*/protected final int TaskExecTime = 5000;protected TaskBean taskBean;@Overridepublic void execute() throws Exception {log.info("开始备份当日流水。。。");Thread.sleep(TaskExecTime);log.info("流水备份完成!!!");}public WorkTaskImp(String param) {this.param = param;}@Overridepublic void setTaskThreadKey(Object key) {this.threadkey = key;}@Overridepublic String toString() {return "【"+ taskBean.getTaskName() + "】" +param + "工作线程编号" + threadkey.toString();}@Overridepublic Object getTaskThreadKey() {return threadkey;}@Overridepublic TaskBean getTaskBean() {return taskBean;}@Overridepublic void setTaskBean(TaskBean taskBean) {this.taskBean = taskBean;}}
分片广播任务
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。使用场景:
1. 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;2. 广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等
@JobHandler(value="shardingJobHandler")@Servicepublic class ShardingJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {// 分片参数ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();EasyJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());// 业务逻辑for (int i = 0; i < shardingVO.getTotal(); i++) {if (i == shardingVO.getIndex()) {EasyJobLogger.log("第 {} 片, 命中分片开始处理", i);System.out.println("第 "+i+" 片, 命中分片开始处理");} else {EasyJobLogger.log("第 {} 片, 忽略", i);System.out.println("第 "+i+" 片, 忽略");}}return SUCCESS;}}
开发kettle任务
使用kettle的Spoon工具开发功能
看菜单 第四章之spoon工具使用教程
子系统存放kettle文件位置
将开发完成的功能的文件,存储如下位置。
以easyloan-classify为例:
D:\IdeaPOCWork\easyloan-classify\classify-springboot\src\main\resources\kettle在Linux系统上打包部署时,会将kettle目录拷贝到服务器app/tomcat/scms/easyloan/下。
kettleJobHandler调度执行器
该调度任务在easy-job-core jar包当中,子系统依赖easy-job-core,就拥有了在调度中心配置kettle执行的调度任务
/*** Kettle任务Handler示例(Bean模式)** 开发步骤:* 1、继承"IJobHandler":“IJobHandler”;* 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;* 3、注册到执行器工厂:添加“@JobHandler(value="自定义jobhandler名称")”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值。* 4、执行日志:需要通过 "EasyJobLogger.log" 打印执行日志;** @author git 2015-12-19 19:43:36*/@JobHandler(value="kettleJobHandler")@Componentpublic class KettleJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {EasyJobLogger.log("EASY-JOB,Kettle批量任务开始.{}", DateUtil.now());Map<String, Object> params = JSONUtil.toMap(param);String kettleFile = (String) params.get("kettleName");EasyJobLogger.log("开始执行kettle任务:{}", params.get("kettleName"));EasyJobLogger.log("执行脚本参数:{}",param);String batDate = (String) params.get("batDate");//批量参数SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");Date curDate = sdf.parse(batDate);//批量日期Date curDateB = com.git.job.core.core.util.DateUtil.addDays(curDate,-1);//批量前一日Date curDateN = com.git.job.core.core.util.DateUtil.addDays(curDate,1);//批量下一日Date curDateBM = com.git.job.core.core.util.DateUtil.addMonths(curDate,-1);//跑批前一月Date curDateBME = DateUtil.endOfMonth(curDateBM);//跑批前一月最后一天Date curDateMB = DateUtil.beginOfMonth(curDate);//跑批当月第一天Date curDateME = DateUtil.endOfMonth(curDate);//跑批当月最后一天Date curDateFB = com.git.job.core.core.util.DateUtil.addMonths(curDate,-3);//跑批前一季String curDateFB_Mtemp = DateUtil.format(curDateFB,"MM");String curDateFB_Ytemp = DateUtil.format(curDateFB,"yyyy");String curDateFB_YM = null;//QUA_BEF_DATE 跑批前一季 20193if(curDateFB_Mtemp.equals("01") || curDateFB_Mtemp.equals("02") || curDateFB_Mtemp.equals("03")){curDateFB_YM = curDateFB_Ytemp + "1";}else if(curDateFB_Mtemp.equals("04") || curDateFB_Mtemp.equals("05") || curDateFB_Mtemp.equals("06")){curDateFB_YM = curDateFB_Ytemp + "2";}else if(curDateFB_Mtemp.equals("07") || curDateFB_Mtemp.equals("08") || curDateFB_Mtemp.equals("09")){curDateFB_YM = curDateFB_Ytemp + "3";}else if(curDateFB_Mtemp.equals("10") || curDateFB_Mtemp.equals("11") || curDateFB_Mtemp.equals("12")){curDateFB_YM = curDateFB_Ytemp + "4";}String curDateFN_Mtemp = DateUtil.format(curDate,"MM");String curDateFN_Ytemp = DateUtil.format(curDate,"yyyy");String curDateFN_YM = null;//QUA_DATE 跑批当季 20194String curDateFN_YM_FM = null;//QUA_DATE_FIR_MON 跑批当季第一月 201910String curDateFN_YM_EM = null;///QUA_DATE_LAS_MON 跑批当季最后一月 201912String curDateHN = null;//YEA_HAF_DATE 跑批半年 2019BString curDateHN_FM = null;///YEA_HAF_FIR_MON 跑批半年第一个月 201907if(curDateFN_Mtemp.equals("01") || curDateFN_Mtemp.equals("02") || curDateFN_Mtemp.equals("03")){curDateFN_YM = curDateFN_Ytemp + "1";curDateFN_YM_FM = curDateFN_Ytemp + "01";curDateFN_YM_EM = curDateFN_Ytemp + "03";curDateHN = curDateFN_Ytemp + "A";curDateHN_FM = curDateFN_Ytemp + "01";}else if(curDateFN_Mtemp.equals("04") || curDateFN_Mtemp.equals("05") || curDateFN_Mtemp.equals("06")){curDateFN_YM = curDateFN_Ytemp + "2";curDateFN_YM_FM = curDateFN_Ytemp + "04";curDateFN_YM_EM = curDateFN_Ytemp + "06";curDateHN = curDateFN_Ytemp + "A";curDateHN_FM = curDateFN_Ytemp + "01";}else if(curDateFN_Mtemp.equals("07") || curDateFN_Mtemp.equals("08") || curDateFN_Mtemp.equals("09")){curDateFN_YM = curDateFN_Ytemp + "3";curDateFN_YM_FM = curDateFN_Ytemp + "07";curDateFN_YM_EM = curDateFN_Ytemp + "09";curDateHN = curDateFN_Ytemp + "B";curDateHN_FM = curDateFN_Ytemp + "07";}else if(curDateFN_Mtemp.equals("10") || curDateFN_Mtemp.equals("11") || curDateFN_Mtemp.equals("12")){curDateFN_YM = curDateFN_Ytemp + "4";curDateFN_YM_FM = curDateFN_Ytemp + "10";curDateFN_YM_EM = curDateFN_Ytemp + "12";curDateHN = curDateFN_Ytemp + "B";curDateHN_FM = curDateFN_Ytemp + "07";}Date curDateYB = com.git.job.core.core.util.DateUtil.addYears(curDate,-1);//跑批前一年String curDateYB_Y = DateUtil.format(curDateYB,"yyyy");//跑批前一年String curDateY_Y = DateUtil.format(curDate,"yyyy");//跑批前一年params.put("DAYDATE",batDate); //日期参数params.put("DAY_BEF_DATE",DateUtil.format(curDateB,"yyyyMMdd"));//跑批前一天params.put("DAY_DATE",batDate);//跑批当天params.put("MON_BEF_DATE",DateUtil.format(curDateBM,"yyyyMM")); //跑批前一月params.put("MON_BEF_LAS_DATE",DateUtil.format(curDateBME,"yyyyMMdd"));//跑批前一月最后一天params.put("MON_DATE",DateUtil.format(curDate,"yyyyMM"));//跑批当月params.put("MON_DATE_FIR_DAY",DateUtil.format(curDateMB,"yyyyMMdd"));//跑批当月第一天params.put("MON_DATE_LAS_DAY",DateUtil.format(curDateME,"yyyyMMdd"));//跑批当月最后一天params.put("QUA_BEF_DATE",curDateFB_YM);//跑批前一季params.put("QUA_DATE",curDateFN_YM);//跑批当季params.put("QUA_DATE_FIR_MON",curDateFN_YM_FM);//跑批当季第一月params.put("QUA_DATE_LAS_MON",curDateFN_YM_EM);//跑批当季最后一月params.put("YEA_HAF_DATE",curDateHN);//跑批半年params.put("YEA_HAF_FIR_MON",curDateHN_FM);//跑批半年第一个月params.put("YER_BEF_DATE",curDateYB_Y);//跑批前一年params.put("YER_DATE",curDateY_Y);//跑批当年params.put("YER_DATE_FIR_DAY",curDateY_Y + "0101");//跑批当年第一天params.put("YER_DATE_FIR_MON",curDateY_Y + "01");//跑批当年第一月params.put("YER_DATE_LAS_DAY",curDateY_Y + "1231");//跑批当年最后一天params.put("YER_DATE_LAS_MON",curDateY_Y + "12");//跑批当年最后一月//modify begin by gh @date: 2020/7/15 跑批 星期 1到7对应周日到周六params.put("WEEK",String.valueOf(DateUtil.dayOfWeek(curDate)));//跑批 星期 1到7对应周日到周六//modify end by gh @date: 2020/7/15String retMsg = "0000:任务执行成功!";for(String key : params.keySet()){EasyJobLogger.log("key="+key+"and value="+params.get(key));}KettleUtilsImp kettleUtilsImp = new KettleUtilsImp();if(KettleTools.isJob(kettleFile)){retMsg = kettleUtilsImp.callNativeJob(kettleFile,params);}else if (KettleTools.isKtr(kettleFile)){retMsg = kettleUtilsImp.callNativeTrans(kettleFile,params);}else{throw new IOException("不支持的kettle脚本类型!");}if(!"0000:任务执行成功!".equals(retMsg)){throw new IOException(retMsg + "kettle执行失败!");}EasyJobLogger.log("kettle日志"+retMsg,DateUtil.now());EasyJobLogger.log("EASY-JOB,Kettle批量任务结束.{}", DateUtil.now());return SUCCESS;}}
注意: 在kettle中可以执行中可以直接使用许多日期变量。kettleJobHandler调用kettle时,会传入参数。
params.put("DAYDATE",batDate); //日期参数 params.put("DAY_BEF_DATE",DateUtil.format(curDateB,"yyyyMMdd"));//跑批前一天 params.put("DAY_DATE",batDate);//跑批当天 params.put("MON_BEF_DATE",DateUtil.format(curDateBM,"yyyyMM")); //跑批前一月 params.put("MON_BEF_LAS_DATE",DateUtil.format(curDateBME,"yyyyMMdd"));//跑批前一月最后一天 params.put("MON_DATE",DateUtil.format(curDate,"yyyyMM"));//跑批当月 params.put("MON_DATE_FIR_DAY",DateUtil.format(curDateMB,"yyyyMMdd"));//跑批当月第一天 params.put("MON_DATE_LAS_DAY",DateUtil.format(curDateME,"yyyyMMdd"));//跑批当月最后一天 params.put("QUA_BEF_DATE",curDateFB_YM);//跑批前一季 params.put("QUA_DATE",curDateFN_YM);//跑批当季 params.put("QUA_DATE_FIR_MON",curDateFN_YM_FM);//跑批当季第一月 params.put("QUA_DATE_LAS_MON",curDateFN_YM_EM);//跑批当季最后一月 params.put("YEA_HAF_DATE",curDateHN);//跑批半年 params.put("YEA_HAF_FIR_MON",curDateHN_FM);//跑批半年第一个月 params.put("YER_BEF_DATE",curDateYB_Y);//跑批前一年 params.put("YER_DATE",curDateY_Y);//跑批当年 params.put("YER_DATE_FIR_DAY",curDateY_Y + "0101");//跑批当年第一天 params.put("YER_DATE_FIR_MON",curDateY_Y + "01");//跑批当年第一月 params.put("YER_DATE_LAS_DAY",curDateY_Y + "1231");//跑批当年最后一天 params.put("YER_DATE_LAS_MON",curDateY_Y + "12");//跑批当年最后一月 params.put("WEEK",String.valueOf(DateUtil.dayOfWeek(curDate)));//跑批 星期 1到7对应周日到周六
配置调度中心任务
1. 查询tb_job_group 确认执行器id。2. 确定任务在日终任务当中位置。触发任务,依赖任务,子任务。3. 插入tb_job_task* id 字段 主键id 由系统名+分组+排序组成,后面必须是数据字结尾。* JOB_GROUP 字段 对应执行器id。* executor_handler 字段,值 kettleJobHandler。* executor_param 字段,传入kettleJobHandler调度中的执行,传入是一个json格式,必须包含一个kettleName值,指kettle绝对路径。
例如:
{"kettleName":"/app/tomcat/scms/easyloan/kettle/dataApplication/TB_CSM_GRO_COM.ktr"}
可以加入其他信息传入 kettleJobHandler。 例如:
{"kettleName":"/app/tomcat/scms/easyloan/kettle/dataApplication/TB_CSM_GRO_COM.ktr","SQLPathKuanBiao":"/app/tomcat/scms/easyloan/kettle/dataApplication","SQLPathApy":"/app/tomcat/scms/easyloan/kettle/dataApplication","SQLPathReport":"/app/tomcat/scms/easyloan/kettle/dataApplication","CsmFilePath":"/app/scms/datafile/send/ecif"}
* job_cron 字段值默认 0/1 * * * * ? * 意思是 1秒触发;Linux|014|cron表达式详解 。* child_jobid 字段, 触发任务id,多个任务用英文 逗号隔开“,”。* dependency_id 字段,依赖任务,多个任务用英文 逗号隔开“,”。当父任务执行完后,触发子任务时,检查依赖任务是否执行成功。所有依赖执行成功才能执行当前任务。
