全部产品

定时任务--快速入门

更新时间:2020-07-23 15:41:21

本文介绍完成定时任务所涉及的整个流程,主要包括下述几个部分:

本地开发

在本地使用 SOFABoot 框架实现定时服务,主要包括下述步骤:

  1. 搭建环境
  2. 创建 SOFABoot Web 工程,可采用下述任一方式:
  3. 引入依赖:在接收定时任务的模块(例如示例工程的 app/endpoint 模块)的 pom.xml 中添加如下依赖。
    • RPC 任务须引入的依赖为:
      1. <dependency>
      2. <groupId>com.alipay.sofa</groupId>
      3. <artifactId>scheduler-enterprise-sofa-boot-starter</artifactId>
      4. </dependency>
    • 消息任务须引入的依赖为:
      1. <dependency>
      2. <groupId>com.alipay.sofa</groupId>
      3. <artifactId>mq-enterprise-sofa-boot-starter</artifactId>
      4. </dependency>
  4. 业务逻辑编写:不同定时业务,需要实现不同的接口,说明如下。

    • 简单 RPC 任务:要实现接口 ISimpleJobHandler。接口内的所有方法:
      • getName():返回字符串,必须跟页面配置的任务名称一致。
      • getThreadPool():执行该任务所用的线程池,说明如下。
        • 允许每个任务处理器注入独立的线程池。
        • 也允许多个任务处理器共享一个线程池。
        • 如果返回 null,则使用公共的默认线程池。
      • handle(JobExecuteContext context) 编写任务执行逻辑,需返回 ClientCommonResult 表示任务执行结果。
    • 消息任务:要实现接口 UniformEventMessageListener。主要待实现方法为 onUniformEvent()

    • 集群任务:分为 2 个层次。

      • 拆分层:要实现接口 IClusterJobSplitHandler,接口待实现的方法说明如下。
        • handle:实现业务逻辑的主要方法。入参 JobExecuteContext 包含了任务的调度信息,也可以获取自定义参数等。
        • getName:定义handler 名字
        • getThreadPool:自定义线程池。如果没有定义,则默认使用框架自带的。推荐使用自定义的。
      • 执行层:要实现接口 IClusterJobExecuteHandler,逻辑上分为 3 步。
        • read:读数据。
        • process:处理数据。
        • write:写数据。

    上述业务逻辑示例代码如下:

    展开查看:简单 RPC 示例
    1. public class SchedulerDemo implements ISimpleJobHandler {
    2. ThreadPoolExecutor threadPoolExecutor;
    3. @Override
    4. public String getName() {
    5. return "DEMOAPP_DEMOTASK";
    6. }
    7. @Override
    8. public ThreadPoolExecutor getThreadPool() {
    9. return threadPoolExecutor;
    10. }
    11. @Override
    12. public ClientCommonResult handle(JobExecuteContext context) {
    13. boolean success = true;
    14. // 业务逻辑代码
    15. if (success) {
    16. return ClientCommonResult.buildSuccessResult();
    17. } else {
    18. return ClientCommonResult.buildFailResult("handle failed");
    19. }
    20. }
    21. public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
    22. this.threadPoolExecutor = threadPoolExecutor;
    23. }
    24. }
    展开查看:消息任务示例
    1. public class MsgJobDemo implements UniformEventMessageListener {
    2. /**
    3. * logger
    4. */
    5. private static final Logger LOGGER = LoggerFactory.getLogger(MsgJobDemo.class);
    6. @Override
    7. public void onUniformEvent(UniformEvent uniformEvent, UniformEventContext uniformEventContext) throws Exception {
    8. // 所有消息定时任务的Topic 统一为 TP_F_SC
    9. final String topic = uniformEvent.getTopic();
    10. // 消息事件码 即是页面配置的 消息任务事件码
    11. final String eventCode = uniformEvent.getEventCode();
    12. // 接收触发后的定时业务处理
    13. LOGGER.info("[Receive an uniformEvent] topic {} eventcode {} eventId {} payload {}",
    14. new Object[]{topic, eventCode, uniformEvent.getId()});
    15. // todo 处理业务逻辑
    16. }
    17. }
    展开查看:集群任务-拆分层示例
    1. /**
    2. * 集群任务拆分层处理器 demo
    3. * 集群任务将一个任务分为多个任务,分发到业务集群中处理,提高任务处理效率。集群任务分拆分阶段和执行阶段,允许有多个拆分阶段,只有一个执行阶段。
    4. * 这个例子是展示集群拆分阶段的使用
    5. */
    6. @Component
    7. public class ClusterSplitHandlerDemo implements IClusterJobSplitHandler {
    8. private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSplitHandlerDemo.class);
    9. private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(20,
    10. 300, 1, TimeUnit.HOURS, new ArrayBlockingQueue<Runnable>(100) {
    11. });
    12. @Override
    13. public SplitChunkDataResult handle(ClusterJobSplitContext clusterJobSplitContext) {
    14. SplitChunkDataResult result = new SplitChunkDataResult();
    15. try {
    16. // 处理业务逻辑
    17. // 拆分多个分片返回
    18. List<IChunkData> chunkList = new ArrayList();
    19. for (int chunk = 0; chunk < 10; chunk++) {
    20. ShardingChunkData chunkData = new ShardingChunkData();
    21. chunkData.setShardingRule(String.valueOf(chunk));
    22. chunkList.add(chunkData);
    23. }
    24. result.setChunkDatum(chunkList);
    25. result.setSuccess(true);
    26. // 成功
    27. return result;
    28. } catch (Exception e) {
    29. // 处理失败
    30. LOGGER.error("split exception", e);
    31. result.setMsg(e.getMessage());
    32. result.setSuccess(false);
    33. return result;
    34. }
    35. }
    36. @Override
    37. public String getName() {
    38. return "CLUSTER_SPLIT_HANDLER_DEMO";
    39. }
    40. @Override
    41. public ThreadPoolExecutor getThreadPool() {
    42. return executor;
    43. }
    44. }
    展开查看:集群任务-执行层示例
    1. /**
    2. * 集群任务执行层处理器 demo
    3. * 集群任务执行阶段分为三步 read / process / write
    4. *
    5. * @author changwei.zjl
    6. * @version $Id: ClusterExecuteHandlerDemo.java, v 0.1 2020-07-12 20:33 changwei.zjl Exp $$
    7. */
    8. @Component
    9. public class ClusterExecuteHandlerDemo implements IClusterJobExecuteHandler {
    10. private static final Logger LOGGER = LoggerFactory.getLogger(ClusterExecuteHandlerDemo.class);
    11. private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(20,
    12. 300, 1, TimeUnit.HOURS, new ArrayBlockingQueue<Runnable>(100) {
    13. });
    14. @Override
    15. public void preExecute(ClusterJobExecuteContext clusterJobExecuteContext) {
    16. // 前置处理
    17. }
    18. /**
    19. * 读数据阶段
    20. *
    21. * @return
    22. */
    23. @Override
    24. public IReader getReader() {
    25. return new IReader() {
    26. @Override
    27. public Integer waitIntervalWhenNoData() {
    28. return null;
    29. }
    30. @Override
    31. public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
    32. List<String> stringList = Lists.newArrayList();
    33. IChunkData chunkData = context.getChunk();
    34. LOGGER.info(String.format(
    35. "JobExecuteHandler getReader executeId:%s, param:%s, chunk:%s",
    36. context.getExecuteId(), context.getCustomParams(), chunkData.parseToString()));
    37. // 读取数据
    38. int st = 0;
    39. int end = 10;
    40. for (int i = st; i < end; i++) {
    41. stringList.add(String.valueOf(i));
    42. }
    43. LOGGER.info(
    44. String.format("JobExecuteHandler loadData false executeId:%s, chunk:%s",
    45. context.getExecuteId(), chunkData.parseToString()));
    46. // 返回数据。 如果没有数据 则 hasMore 设置为false;如果还有数据则 返回 true,后面还会继续调度 read 方法读取数据。
    47. return new LoadData<String>(stringList, false);
    48. }
    49. };
    50. }
    51. /**
    52. * 处理阶段
    53. *
    54. * @return
    55. */
    56. @Override
    57. public IProcessor getProcessor() {
    58. return new IProcessor() {
    59. @Override
    60. public DataProcessResult process(ClusterJobExecuteContext context,
    61. Object data) throws Exception {
    62. // todo 处理业务逻辑
    63. return new DataProcessResult(true, "", data);
    64. }
    65. @Override
    66. public ThreadPoolExecutor getProcessThreadPool() {
    67. return null;
    68. }
    69. };
    70. }
    71. /**
    72. * 写阶段
    73. *
    74. * @return
    75. */
    76. @Override
    77. public IWriter getWriter() {
    78. return new IWriter<String>() {
    79. @Override
    80. public int getCountPerWrite() {
    81. return 1;
    82. }
    83. @Override
    84. public ClientCommonResult write(ClusterJobExecuteContext context,
    85. IDataItem<String> dataItem) throws Exception {
    86. switch (dataItem.getType()) {
    87. case SINGLE:
    88. LOGGER.info(String.format("getWriter write single data:%s",
    89. ((SingleDataItem<String>) dataItem).getItem()));
    90. // 保存单条数据
    91. break;
    92. case MULTIPLE:
    93. LOGGER.info(String.format("getWriter write multi data:%s",
    94. ((MultiDataItem<String>) dataItem).getItemList()));
    95. // 保存多条数据
    96. break;
    97. default:
    98. break;
    99. }
    100. LOGGER.info(String.format(
    101. "JobExecuteHandler write success executeId:%s, params:%s, chunk:%s",
    102. context.getExecuteId(), context.getCustomParams(),
    103. context.getChunk().parseToString()));
    104. return ClientCommonResult.buildSuccessResult();
    105. }
    106. @Override
    107. public ThreadPoolExecutor getWriteThreadPool() {
    108. return executor;
    109. }
    110. };
    111. }
    112. @Override
    113. public ILimiter getLimiter() {
    114. // 限流器,可以自定义,也可以使用框架自带的限流器
    115. return new DefaultLimiter(10);
    116. }
    117. @Override
    118. public void postExecute(ClusterJobExecuteContext clusterJobExecuteContext) {
    119. // 后置处理
    120. }
    121. /**
    122. * 集群分片在执行过程中,会每隔几秒调用一次。用来查询处理进度
    123. *
    124. * @param clusterJobExecuteContext
    125. * @return
    126. */
    127. @Override
    128. public Progress calProgress(ClusterJobExecuteContext clusterJobExecuteContext) {
    129. // 展示进度
    130. // 一般会查询当前分片的数据处理了多少
    131. Progress progress = new Progress();
    132. // 设置改分片下的数据总量
    133. int totalCount = 10;
    134. progress.setTotal(totalCount);
    135. // 设置已经处理成功的数量
    136. int successCount = 10;
    137. progress.setSucceed(successCount);
    138. return progress;
    139. }
    140. @Override
    141. public boolean isProcessAsync() {
    142. return false;
    143. }
    144. /**
    145. * 处理器名字
    146. *
    147. * @return
    148. */
    149. @Override
    150. public String getName() {
    151. return "CLUSTER_EXECUTOR_HANDLER_DEMO";
    152. }
    153. @Override
    154. public ThreadPoolExecutor getThreadPool() {
    155. return executor;
    156. }
    157. }
  5. Spring 配置:

    • 简单 RPC 任务:需要将接口实现类配置为 Spring Bean,有下述 2 种配置方式:

      • src/main/resources/META-INF/xxx/xxx-xxx.xml中进行配置:
        1. <bean id="schedulerDemo" class="com.antcloud.tutorial.scheduler.SchedulerDemo" />
      • 使用注解驱动(annotation-driven)的方式声明 Bean。
    • 消息任务:消费订阅端需要配置的内容包括下述几个方面。

      • 配置路径:src/main/resources/META-INF/xxx/xxx-xxx.xml
      • 配置内容:
        • 消息主题:例如 <sofa:channel value="TP_F_SC">
        • 消息事件码:例如 eventCode="EC_TASK_SCHEDULERTUTORIAL_MSG_DEMO"
        • 消息订阅组:例如 group="S_SCHEDULERTUTORIAL_MSG_DEMO"
        • sofa:listener 元素的 ref 属性:值为实现消息监听器的类名。例如 <sofa:listener ref="schedulerDemo"/>
      展开查看:消息任务配置示例
      1. <!-- consumer declaration, the id and group attribute are required and their value must be unique -->
      2. <sofa:consumer id="uniformEventSubscriber" group="S_SCHEDULERTUTORIAL_MSG_DEMO">
      3. <sofa:listener ref="schedulerDemo"/>
      4. <sofa:channels>
      5. <!-- channel value is the involved topic -->
      6. <sofa:channel value="TP_F_SC">
      7. <!-- each event represents a subscription -->
      8. <sofa:event eventType="direct" eventCode="EC_TASK_SCHEDULERTUTORIAL_MSG_DEMO" persistence="false"/>
      9. </sofa:channel>
      10. </sofa:channels>
      11. <sofa:binding.msg_broker/>
      12. </sofa:consumer>
      13. <!-- messageListener listener bean declaration, implements com.alipay.common.event.UniformEventMessageListener -->
      14. <bean id="msgJobDemo" class="com.alipay.sofa.endpoint.task.MsgJobDemo"/>
    • 集群任务:参考简单 RPC 任务的 xml 配置。
  6. 在云端发布前,请务必进行 application.properties 配置。更多详情,请参考 引入 SOFA 中间件。配置步骤如下:

    展开查看配置步骤

    1. 请前往 SOFAStack 控制台 > 研发效能 > 脚手架 > Step 2,示例如下:
      脚手架截图从中获取下述信息:
      • 实例标识:应用实例在工作空间中的唯一标识,在 application 中对应的 key 为:com.alipay.instanceid
      • AntVIP:应用通过 AntVIP 来获取各个组件的服务端地址,每个区域一个地址。在 application 中对应的 key 为 com.antcloud.antvip.endpoint。不同环境的 AntVIP 地址值,见下述规定。
        • 杭州金区VPC:100.103.201.136
        • 上海非金:100.103.1.174
    2. 脚手架 控制台,点击右上角用户图像,选择 AccessKey Management,在跳转的页面中即可获取访问控制对应的属性值。 这些属性在 application 中对应的 key 分别为:

      • Access Key IDcom.antcloud.mw.access
      • Access Secretcom.antcloud.mw.secret

        说明:如果需要创建 AccessKey,可点击 获取 AK。更多详情,请参见 创建 AccessKey

    3. 配置运行模式和运行环境,示例如下:

      1. run.mode=NORMAL
      2. com.alipay.env=shared
    4. 将上述属性键和值,配置在 application.properties 文件中。

云端发布

定时功能暂不支持通过本地启动应用的方式进行体验。如要体验该功能,必须先将应用发布到云端服务器。

  • 应用整体的发布流程,请参考 技术栈使用指南 中的 技术栈与应用发布流程
  • 应用的详细发布步骤,建议参考 经典应用服务 下的 快速入门

控制台配置

应用发布到云端后,您需要前往云端定时任务控制台进行定时任务的创建与管理,详情参见 新增与管理定时任务