客户端中添加SchedulerX依赖的JAR包,实现JobProcessor接口,即可接入SchedulerX。

创建Java调度任务

创建Java调度任务的详细操作步骤请参见创建调度任务。本文仅介绍Java任务特有的配置。

创建任务-基本信息-Java
  • 任务类型Java
  • Processor类名:即实现JobProcessor类的全路径名,例如com.alibaba.schedulerx.test.processor.HelloWorldJob
  • 执行方式:Java任务类型支持单击运行、广播运行、并行计算、内存网格、网格计算和分片运行6种执行模式。
    • 单机广播需要实现JavaProcessor。
    • 并行计算内存网格网格计算分片运行需要实现MapJobProcessor。

编程模型

Java任务支持两种编程模型:JavaProcessor和MapJobProcessor。

  • JavaProcessor
    • public void preProcess(JobContext context) throws Exception
    • public ProcessResult process(JobContext context) throws Exception
    • public void preProcess(JobContext context)
    • public void kill(JobContext context)
  • MapJobProcessor
    • public ProcessResult process(JobContext context) throws Exception
    • public void postProcess(JobContext context)
    • public void kill(JobContext context)
    • public ProcessResult map(List<? extends Object> taskList, String taskName)

ProcessResult

每个process需要返回ProcessResult,用来表示任务执行的状态、结果和错误信息。

  • 任务运行成功:return new ProcessResult(true)
  • 任务运行失败:return new ProcessResult(false, ErrorMsg)或者直接抛异常。
  • 任务运行成功并且返回结果:return new ProcessResult(true, result)result是一个字符串,不能大于1000字节。

HelloSchedulerx2.0任务示例

@Component
public class MyProcessor1 extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        //TODO
        System.out.println("Hello, schedulerx2.0!");
        return new ProcessResult(true);
    }
}            

支持Kill功能的任务示例

@Component
public class MyProcessor2 extends JavaProcessor {
    private volatile boolean stop = false;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int N = 10000;
        while (!stop && N >= 0) {
            //TODO
            N--;
        }
        return new ProcessResult(true);
    }

    @Override
    public void kill(JobContext context) {
        stop = true;
    }

    @Override
    public void preProcess(JobContext context) {
        stop = false;  //如果是通过Spring启动,Bean是单例,需要通过preProcess把标记为复位
    }
}          

通过Map模型跑批任务示例

/**
 * 对一张单标进行分布式跑批
 * 1. 根任务先查询一张表,获取minId,maxId
 * 2. 构造PageTask,通过map进行分发
 * 3. 下一级获取到如果是PageTask,则进行数据处理
 *
 */
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //计算分页数量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }

        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }

}