全部产品

集群任务

集群任务支持用户按业务的要求,通过多层的拆分将一个任务拆分到多个客户端上并发执行。

集群任务的开发可以分成两个阶段:拆分阶段和执行阶段。

  • 拆分阶段:对数据进行分片,不限制拆分层数,将拆分结果上报给服务端,由服务端根据拆分的 chunk 通知客户端来拉取数据进行处理。(chunk:一批待处理数据的索引集合)

  • 执行阶段:客户端接收到通知后拉取数据进行处理,处理完后继续拉取新的数据,直到数据都处理完成。

应用场景案例

为了便于理解,本文使用一个场景示例来介绍集群任务的开发过程。

假设某基金公司每天需要进行一次用户清算,由于用户规模较大,因此将用户分为 100 张表,每张表有 10 万左右的用户数据。该公司选择使用两层拆分的集群任务,通过集群任务并行处理能力提高数据处理效率。

具体的实现步骤如下:

  1. 任务拆分阶段:将用户数据进行拆分,详见 集群任务拆分阶段

    • 第一层拆分:按用户表维度进行数据拆分。

    • 第二层拆分:按分页维度进行数据拆分。

  2. 任务执行阶段:对每个用户数据进行处理,详见 集群任务执行阶段

    • 执行模式:支持本地执行模式以及远程执行模式。

    • 线程池配置:支持使用默认线程池配置或自定义线程池配置。

  3. 控制台配置集群任务:参见 新建任务集群任务拆分

任务拆分阶段

任务调度提供了 IClusterJobSplitHandler 接口进行任务拆分,目前支持两种拆分方式:

  • ShardingChunkData 拆分:指定索引的拆分方式。具体使用代码,可参考 工程示例 中的 ClusterFstSplitHandler

  • RangeChunkData 拆分:指定范围的拆分方式。具体使用代码,可参考 工程示例 中的 ClusterSecSplitHandler

ShardingChunkData 拆分

ShardingChunkData 拆分是指定索引的拆分方式。通过索引对这个子任务(chunk)进行唯一标识,客户端会根据指定的索引拉取数据进行处理。要求填充分片规则。

public class ShardingChunkData implements IChunkData {
    /**
     * 分片号
     */
    private String shardingRule;

......
}

其中,shardingRule 是一个分片规则标识,例如可以使用 user_00、user_01 … user_99 作为分片标识。

代码示例

结合上述场景案例,第一层任务拆分可以使用 shardingChunkData 对 100 张用户表进行拆分,具体代码如下:

public class UserSplitterByTable implements IClusterJobSplitHandler<ShardingChunkData> {

    @Override
    public SplitChunkDataResult<ShardingChunkData> handle(ClusterJobSplitContext context) {
        SplitChunkDataResult<ShardingChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
        ArrayList<ShardingChunkData> shardingChunkDatas = new ArrayList<>();
        // user_00 ~ user_99
        for (int i = 0; i < 100; i++) {
            String shardingRule = "user_";
            if (i < 10) {
                shardingRule = shardingRule +"0";
            }
            shardingRule = shardingRule + i;
            // 根据表维度拆分分片
            ShardingChunkData shardingChunkData = new ShardingChunkData(shardingRule);
            shardingChunkDatas.add(shardingChunkData);
        }
        splitChunkDataResult.setChunkDatum(shardingChunkDatas);
        splitChunkDataResult.setSuccess(true);
        return splitChunkDataResult;
    }

    @Override
    public String getName() {
       // 处理器名称
        return "USER_SPLITTER_BY_TABLE";
    }

    @Override
    public ThreadPoolExecutor getThreadPool() {
       // 线程池,建议使用自定义线程池。如果返回null则使用 sdk 自带的线程池。
        return null;
    }
}

RangeChunkData 拆分

RangeChunkData 拆分是指定范围的拆分方式。指定每个子任务处理特定范围内的数据,类似于一个分页效果。要求填充起始索引、结束索引和分片规则。

public class RangeChunkData implements IChunkData {
    /**
     * 分片规则
     */
    private String shardingRule;

    /**
     * 起始索引
     */
    private String start;

    /**
     * 结束索引
     */
    private String end;

......
}
  • shardingRule:分片规则,例如指定表 user_01。

  • start:开始索引,记录开始位置,例如 1000。

  • end:结束索引,记录结束位置(包含),例如 2000。

例如,以下配置表示处理 user_01 表中 1000 到 2000 的数据。

RangeChunkData chunk =new RangeChunkData ();
chunk.setShardingRule("user_01");
chunk.setStart("1000");
chunk.setEnd("2000");

代码示例

在场景案例中,第二层可以使用 RangeChunkData 将用户数据按分页维度进行拆分,具体代码如下:

public class UserSplitterByPage implements IClusterJobSplitHandler<RangeChunkData> {

    @Override
    public SplitChunkDataResult<RangeChunkData> handle(ClusterJobSplitContext context) {
        // 第一层拆分的分片
        ShardingChunkData shardingChunkData = (ShardingChunkData) context.getChunkData();
        String shardingRule = shardingChunkData.getShardingRule();

        // 1. 根据分片查询数量
        int count = queryCountByTable(shardingChunkData.getShardingRule());
        SplitChunkDataResult<RangeChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
        ArrayList<RangeChunkData> shardingChunkDatas = new ArrayList<>();

        // 2. 做分页处理,每页处理 1000 条
        int pageCount = 1000;
        for (int page = 0; page < count / pageCount; page++) {
            String startRows = String.valueOf(page * pageCount);
            // 包含
            String endRows = String.valueOf((page + 1) * pageCount - 1);
            RangeChunkData rangeChunkData = new RangeChunkData(shardingRule, startRows, endRows);
            shardingChunkDatas.add(rangeChunkData);
        }

        splitChunkDataResult.setChunkDatum(shardingChunkDatas);
        splitChunkDataResult.setSuccess(true);
        return splitChunkDataResult;
    }

    // mock
    private int queryCountByTable(String shardingRule) {
        if ("user_00".equals(shardingRule)) {
            // user_00 表 10 万用户
            return 100000;
        } else {
            // 其他表 9 万用户
            return 90000;
        }
    }

    @Override
    public String getName() {
        return "USER_SPLITTER_BY_PAGE";
    }

    @Override
    public ThreadPoolExecutor getThreadPool() {
        return null;
    }
}

任务执行阶段

集群任务执行阶段分为三个子阶段:

  • Read 阶段:从数据源读取数据。

  • Process 阶段:将读取的数据对象转换为要写入数据源的对象,该阶段不是必须阶段,可以不设置。

  • Write 阶段:将数据写入数据源。

    说明

    集群任务拆分阶段的相关代码,可参考 任务调度工程示例 中的 ClusterExecuteHandler

Read 阶段

读取数据服务需要实现接口 IReader

public interface IReader<T>{

     /**
     * 读取数据
     *
     * @param context
     * @return
     */
        LoadData<T> read(ClusterJobExecuteContext context) throws Exception;
}

LoadData 对象数据结构如下:

public class LoadData<T> extends MultiDataItem<T>{

        /**
         * 是否还有未捞取的待处理数据,值为false时,处理完本次捞取出的数据就回调服务端;当值为true时,认为还有数据待处理,处理完本次捞取数据后继续捞取数据。
         */
        private boolean hasMore;

        public LoadData(List<T> itemList,boolean hasMore){
            super(itemList);
            this.hasMore=hasMore;
        }

        public static boolean isEmpty(LoadData loadData){
            return loadData==null || loadData.isEmpty();
        }

        public boolean isHasMore(){
            return hasMore;
        }

        public void setHasMore(boolean hasMore){
            this.hasMore=hasMore;
        }
}

使用 read 方法分批读取数据时,需要注意设置 hasMore 的值,当值为 true 时,处理完本次读取的数据后会再次进行读取数据。

如果需要打开关闭文件,可以使用 IStreamReader 接口:

public interface IStream {

    /**
     * 打开流
     *
     * @param context
     * @throws Exception
     */
    void open(ClusterJobExecuteContext context) throws Exception;

    /**
     * 关闭流
     *
     * @param context
     * @throws Exception
     */
    void close(ClusterJobExecuteContext context) throws Exception;
}

public interface IStreamReader<T> extends IStream, IReader<T> {

}

该接口继承自 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

Process 阶段

处理数据服务需要实现接口 IProcessor

public interface IProcessor<I, O> {

    /**
     * 数据处理,将读取出的对象加工转换为要处理的对象
     *
     * @param r
     * @return
     */
    DataProcessResult<O> process(ClusterJobExecuteContext context, I i) throws Exception;

    /**
     * 数据处理线程池,没有配置时使用handler的线程池
     *
     * @return
     */
    ThreadPoolExecutor getProcessThreadPool();
}

当不需要对读取的数据进行转换时,可以不设置该服务,这样会直接把读取的数据交给写服务。该服务可以单独指定线程池,当没有设置时默认使用任务 handler 的线程池。

Write 阶段

写数据服务需要实现接口 IWriter

public interface IWriter<T> {

    /**
     * 每次执行的数据块包含的数据量最大值
     *
     * @return
     */
    int getCountPerWrite();

    /**
     * 写数据
     *
     * @param context
     * @param dataItem
     * @return
     * @throws Exception
     */
    ClientCommonResult write(ClusterJobExecuteContext context,
                             IDataItem<T> dataItem) throws Exception;

    /**
     * 写数据线程池,没有配置时使用handler的线程池
     *
     * @return
     */
    ThreadPoolExecutor getWriteThreadPool();
}

getCountPerWrite 方法用来设置每次批量 write 的数据量,值小于等于 0 时认为一次只写入一条数据。该服务可以单独设置线程池,当没有设置时默认使用任务 handler 的线程池。

IDataItem 就是待处理的数据对象,有两种类型:

  • SingleDataItem:只包含一条数据,当 getCountPerWrite 方法的返回值小于或等于 1 时是该类型。

  • MultiDataItem:包含多条数据,当 getCountPerWrite 方法的返回值大于 1 时是该类型。

可根据实际情况对 dataItem 进行类型转换,例如:

switch(dataItem.getType()){
    case SINGLE:
         ((SingleDataItem<Integer>)dataItem).getItem();
         break;
    case MULTIPLE:
         (MultiDataItem<Integer>)dataItem).getItemList();
         break;
    default:
         break;
}

如果需要打开关闭文件,可以使用接口 IStreamWriter

public interface IStreamWriter<T> extends IStream,IWriter<T>{

}

该接口继承了 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

代码示例

结合上述应用场景案例,在执行阶段,需要根据拆分的分片从数据库或其他数据源中拉取数据进行处理,然后落库。

  • Read 阶段:根据分片信息(shardingRule、start、end)到指定表里拉取指定范围的用户数据。

  • Process 阶段:对拉取到的用户数据进行处理。

  • Write 阶段:将处理后的数据落库。

具体代码如下:

public class UserProcessHandler implements IClusterJobExecuteHandler {
    private final Logger LOGGER = LoggerFactory.getLogger(ClusterJobExecutor.class);
    private ThreadPoolExecutor threadPool;

    @Override
    public void preExecute(ClusterJobExecuteContext context) {
        // 前置处理
        LOGGER.info(String.format("preExecute chunkId:%s, param:%s",
                context.getChunkId(), context.getCustomParams()));
    }

    @Override
    public IReader getReader() {
        return new IReader() {
            @Override
            public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
                IChunkData chunkData = context.getChunk();
                RangeChunkData rangeChunkData = (RangeChunkData) chunkData;

                // 根据索引去查询数据
                String shardingRule = rangeChunkData.getShardingRule();
                int start = Integer.parseInt(rangeChunkData.getStart());
                int end = Integer.parseInt(rangeChunkData.getEnd());

                // 模拟查询数据
                List<String> stringList = queryUserInfo(shardingRule, start, end);

               // 一次查询,后面没有数据
                boolean hasMore = false;
                return new LoadData<String>(stringList, hasMore);
            }

            private List<String> queryUserInfo(String shardingRule, int start, int end) {
                List<String> stringList = Lists.newArrayList();
                for (int userId = start; userId <= end; userId++) {
                    stringList.add(shardingRule + "" + userId);
                }
                return stringList;
            }
        };
    }

    @Override
    public IProcessor getProcessor() {
        return new IProcessor() {
            @Override
            public DataProcessResult process(ClusterJobExecuteContext context,
                                             Object data) throws Exception {
                // 处理数据
                System.out.println("process user" + data);
                return new DataProcessResult(true, "", data);
            }

            @Override
            public ThreadPoolExecutor getProcessThreadPool() {
                return null;
            }
        };
    }

    @Override
    public IWriter getWriter() {
        return new IWriter<String>() {
            @Override
            public int getCountPerWrite() {
                return 1;
            }

            @Override
            public ClientCommonResult write(ClusterJobExecuteContext context,
                                            IDataItem<String> dataItem)throwsException {
                // 数据存储
                switch (dataItem.getType()) {
                    case SINGLE:
                        // 单个数据块,只包含一条数据
                        SingleDataItem<String> singleDataItem = (SingleDataItem<String>) dataItem;
                        LOGGER.info(String.format("getWriter write single data:%s", singleDataItem.getItem()));
                        System.out.println(String.format("write single data:%s", singleDataItem.getItem()));
                        break;
                    case MULTIPLE:
                        // 复合数据块,包含多条数据。比如任务停止时会将多条数据传入
                        MultiDataItem<String> multiDataItem = (MultiDataItem<String>) dataItem;
                        LOGGER.info(String.format("getWriter write multi data:%s", multiDataItem.getItemList()));
                        break;
                    default:
                        break;
                }
                return ClientCommonResult.buildSuccessResult();
            }

            @Override
            public ThreadPoolExecutor getWriteThreadPool() {
                return BlockingThreadPool.getThreadPool();
            }
        };
    }

    @Override
    public ILimiter getLimiter() {
        return new DefaultLimiter(10);
    }

    @Override
    public void postExecute(ClusterJobExecuteContext context) {
// 后置处理
        LOGGER.info(String.format("JobExecuteHandler postExecute chunkId:%s, param:%s",
                context.getChunkId(), context.getCustomParams()));
    }

    @Override
    public Progress calProgress(ClusterJobExecuteContext context) {
        return new Progress();
    }

    @Override
    public boolean isProcessAsync() {
        return true;
    }

    @Override
    public String getName() {
        return "USER_PROCESS_HANDLER";
    }

    @Override
    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = CommonThreadPool.getThreadPool("REMOTE_EXECUTE");
    }
}

执行模式

集群任务的执行阶段支持两种模式:

本机执行

本机执行时需要实现接口 IClusterJobExecuteHandler,如下:

public interface IJobHandler {

    /**
     * handler的名字
     *
     * @return
     */
    String getName();

    /**
     * 可以留空, 使用默认执行线程池
     *
     * @return
     */
    ThreadPoolExecutor getThreadPool();

}

public interface IClusterJobExecuteHandler<I, O> extends IJobHandler {

    /**
     * 预处理
     *
     * @param context
     */
    void preExecute(ClusterJobExecuteContext context);

    /**
     * 获取数据读取服务
     *
     * @return
     */
    IReader<I> getReader();

    /**
     * 获取数据清洗服务,返回NULL时会跳过数据清洗步骤,直接处理读取出的数据
     *
     * @return
     */
    IProcessor<I, O> getProcessor();

    /**
     * @return
     */
    IWriter<O> getWriter();

    /**
     * 获取限流器,返回NULL时使用默认限流器
     *
     * @return
     */
    ILimiter getLimiter();

    /**
     * 本次执行完的后置处理
     *
     * @param context
     * @return
     */
    void postExecute(ClusterJobExecuteContext context);

    /**
     * 计算处理进度
     *
     * @param context
     * @return
     */
    Progress calProgress(ClusterJobExecuteContext context);

    /**
     * 是否异步处理,开启异步执行后,reader-process-write阶段全部是异步处理
     *
     * @return
     */
    boolean isProcessAsync();
}

实现 IClusterJobExecuteHandler 的类负责处理任务。实现时需要设置:

  • IReader

  • IWriter

  • IProcessor:非必须设置,当没有设置 IProcessor 时会把读取出的数据直接交给 IWriter 服务。

  • ILimiter:没有设置时使用默认的限流服务。

当 isProcessAsync 返回 true 时,reader-process-write 阶段全部是异步处理,即读取的数据放到队列给 process 或 write 消费后会立即开始下一次读取。当返回 false 时,读取的数据需要等待被 write 消费完后开始下一次读取。

注意

IReader 服务如果配置成了 bean,需要确保服务是无状态的,否则多线程场景下会相互干扰。如果需要存储一些处理过程中的数据,那么在需要在 getReader 方法里 new 一个 IReader 服务实例。IWriter 服务也是如此。

远程执行

远程执行时需要实现接口 IRemoteClusterJobExecuteHandler,如下:

public interface IRemoteClusterJobExecuteHandler<T> extends IClusterJobExecuteHandler<T>{

        /**
         * 设置路由策略,没有设置时默认为轮询
         *
         * @return
         */
        IClusterRouter getClusterRouter();

        /**
         * 分发完一次load出来的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位是ms,返回值<=0时认为不休眠
         *
         * @return
         */
        int getSleepAfterPerLoad();
}

该接口继承自 IClusterJobExecuteHandler,需要额外两个方法:getClusterRouter 和 getSleepAfterPerLoad。

  • 当远程处理数据时,客户端会把捞取的数据分发给业务集群内的其它机器进行处理,getClusterRouter 方法用来设置分发时的路由规则,目前支持随机(RandomClusterRouter)和轮询(RoundRobinClusterRouter)两种方式,当方法返回 null 时默认为轮询。

  • getSleepAfterPerLoad 方法用来设置处理完读取的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位时ms,仅当 isProcessAsync 返回 false 时生效。

集群内的远程 RPC 调用使用 oneway 模式,所以无法获取数据的处理结果,需要业务方自己记录处理结果。如需帮助,请 提交工单 咨询。

可以通过实现接口 IRemoteProcessorExecutor 来设置接收远程分发请求的线程池,将实现类发布成 bean 即可,接口如下:

public interface IRemoteProcessorExecutor{

/**
     * 获取线程池
     *
     * @return
     */
     Executor getExecutor();
}

另外,远程调用需要在客户端启动一个 RpcServer:

  • 对于 SOFABoot 项目,在工程的 application.properties 文件中使用配置项:

    com.alipay.sofa.antscheduler.remote.execute.enable=true
    com.alipay.sofa.antscheduler.remote.execute.port=xxx
  • 对于非 SOFABoot 项目,需要在初始化客户端的时候设置下面两个参数:

    public class Config{
    
    ......
    
    /**
       * 是否支持远程执行
       */
    private boolean isEnableRemoteExecute =false;
    
    /**
       * 远程服务端口,没有设置时使用默认端口9989
       */
    private int  remoteServerPort;
    ......
    }

线程池默认配置

IProcessor 和 IWriter 的线程池没有设置时,默认使用 IJobHandler 里设置的线程池。当 IJobHandler 也没有设置时,使用的是客户端默认指定的,配置参数为:

minPoolSize:20
maxPoolSize:300
queueSize:100
keepAliveTime:1小时

当没有设置 IRemoteProcessExecutor 时,默认使用 bolt 协议提供的默认线程池,配置参数为:

minPoolSize:20
maxPoolSize:400
queueSize:600
keepAliveTime:60秒

您也可以对 IProcessor,IWriter,IJobHandler 以及 IRemoteProcessExecutor 自定义设置线程池。