全部产品

集群任务限流

为避免负载过重,控制数据消费的速率,集群任务提供了动态和静态两种流控能力。

集群任务限流原理

1621487022318-e23ce1d5-58fc-46d7-bc63-f75d282b6b79如上图所示,一个 Chunk(分片)的执行分为如下三个阶段,这三个阶段构建成一个循环。

  1. Read 阶段

    Reader 读取一批数据,读取数据接口返回数据列表和后续是否有新的数据标志。

    Reader 根据 limiter 的限流速率向 Processor 推送数据。

  2. Process 阶段

    Processor 接收到数据后,通过多线程的方式对这批数据进行处理。处理后通过队列推送至 Writer。

  3. Write 阶段

    Writer 接收到数据后,通过多线程的方式对数据进行写操作。数据写操作完成之后,Writer 根据是否有新数据标志进行判断是否有数据,如果还有数据,则会继续读取。

限流类型

为避免 Process 阶段和 Write 阶段线程处理过程复杂,数据存放到 Process 阶段的队列时会增加一个限流器,控制数据读取和存放的速率。任务调度提供了两种限流类型:

  • 静态限流

    静态限流通过代码配置限流规则,并可通过页面动态调整限流速率。优势是在页面没有配置限流的情况下,也可以通过限流规则的初始值进行限速,但需要您手动指定限流器。

    • 必须在代码中配置限流速率初始值。

    • 可通过页面进行动态调整限流速率。

    • 必须指定限流器。

  • 动态限流

    动态限流通过页面去调整限流速率,不管是否提供限流器,动态限流都可以正常工作。 如果没有指定限流器,任务调度客户端使用默认的 DefaultLimiter 限流器。  

    • 必须在页面上配置限流速率。

    • 通过页面调整限流速率。

    • 不指定限流器时,任务调度客户端使用默认的 DefaultLimiter 限流器。

集群任务限流配置

静态限流

静态限流就是在代码中指定限流规则,这样即使调度管控台没有配置,也有一个初始化的限流规则。 框架提供了默认的限流器  DefaultLimiter,指定限流器示例如下:

该示例表示每秒处理 10 条数据。

public class ClusterJobExecuteOneHandler implements IClusterJobExecuteHandler<Integer, Integer> {

    ... // 省略

    @Override
    public ILimiter getLimiter() {
        // 静态配置写死每秒处理10条数据
        return new DefaultLimiter(10);
    }
   
    ...   // 省略
}

如果此处设置限流,返回 null,并且在调度平台也没有设置限流规则,则表示不限速。

调度框架提供了扩展接口,支持用户自定义限流规则,详情请参见 限流扩展 部分。

动态限流

不管用户客户端是否指定了 ILimiter 限流器,都可以通过管控页面动态调整限流值。如果用户在客户端中没有指定 ILimiter,那么使用框架提供的默认限流器 DefaultLimiter,调度管控页面不配置的限流不生效。配置方式有以下两种:

全局生效

任务配置限流是全局生效,配置后对之后的每次任务触发都生效。

  1. 在创建集群任务页面,开启 高级选项

  2. 单台机器最大处理速率 文本框,输入每秒处理数据条数。

    边框
  3. 配置信息完成后,单击 提交

本次触发生效

仅对本次任务触发生效。

  1. 进入任务详情页面,定位对应的触发记录,单击右下角 处理数据量 旁的编辑图标。

    数据
  2. 输入每秒处理数据条数。

    2
  3. 单击右侧的对号图标。

设置完成后立刻生效,这样客户端就可以按指定的速率执行。

限流扩展

如果默认的限流器 DefaultLimiter 无法满足用户需求,比如根据业务逻辑限流、根据 CPU、内存、IO 情况限流等,限流模式提供了扩展接口,支持用户自定义限流规则。用户实现 ILimiter 接口,并在 getLimiter() 方法中返回实例即可。ILimiter 接口说明如下:

public interface ILimiter {

    /**
     * 设置限流速率, 当permitsPerSecond为NULL或<=0时表示不限速
     *
     * @param permitsPerSecond
     */
    void setRate(Integer permitsPerSecond);

    /**
     * 获取许可,阻塞直到获取到许可
     *
     * @return
     */
    void acquire();
}
  • setRate() 方法: 当管控出现一些配置变更时,框架会调用该方法告知用户。 如果管控没有配置,则 permitsPerSecond 为 null。这部分的实现可以参考 DefaultLimiter 的实现逻辑。

  • acquire() 方法:获取许可,如果不允许则阻塞。

如果想快速实现自定义限流器,可以参考  DefaultLimiter 的实现。

最佳实践

  • 如果只想通过管控页面动态控制,可以提供限流器。 getLimiter() 直接返回 null 即可。

  • 如果初始化启动时就需要限流,那么可以指定限流器和速率,首选 DefaultLimiter。

  • 如果 DefaultLimiter 无法满足业务诉求,或者业务有更加负责的限流规则(比如根据当前机器的 CPU、内存、线程等情况做限流)那需要用户自实现一个限流器。