Flink SQL是为了简化计算模型、降低您使用Flink门槛而设计的一套符合标准SQL语义的开发语言。本文为您介绍如何基于Dataphin研发Flink SQL类型的计算任务。

背景信息

项目绑定了Flink计算源,即可新建Flink SQL任务。Flink SQL任务支持处理离线与实时计算数据。

新建的Flink SQL任务默认开启实时模式,即只处理实时计算任务。您可以手动开启离线模式,处理离线计算任务。同时,Flink SQL任务支持处理离线和实时任务。关于Flink SQL更多信息,请参见Flink SQL参考

步骤一:新建Flink SQL任务

  1. 登录Dataphin控制台
  2. 在Dataphin控制台页面,选择工作区地域后,单击进入Dataphin>>
  3. 在Dataphin首页,单击顶部菜单栏研发
  4. 按照下图指引,进入新建Flink_SQL对话框。
    gagaga
  5. 新建Flink_SQL对话框,配置参数后,单击确定
    参数 说明
    名称 名称的命名规则如下:
    • 只能包含小写英文字母、数字、下划线(_)。
    • 名称的长度范围为3~62个字符。
    • 项目内的名称不支持重复。
    • 名称仅支持以英文字母开头。
    选择目录 计算任务所属的目录。
    资源队列 选择该项目所绑定实时计算源中的资源队列。
    引擎版本 选择当前资源队列所支持的版本。

步骤二:开发Flink SQL任务的代码

  1. 在Flink SQL任务代码页面,编写任务的代码。
    您可以单击页面右上方的格式化,系统自动调整SQL代码格式。
  2. 单击页面右上方的预编译,校验代码任务的语法及权限问题。
  3. 单击页面右上方的调试,代码任务采样数据并进行本地调试,保障代码任务的正确性。
    1. 调试配置对话框的选择采样模式页签中,选择调试的模式后,单击下一步
      fagag
    2. 采样调试数据页签中,为元数据进行采样调试。
      您可以通过自动抽样和上传数据的方式,为元数据进行采样调试。适用场景说明如下:
      • 自动抽样:自动抽样到的数据是随机的,所以适用于对采集到的数据没有限制的场景。选择自动抽样后,需要配置抽样条数
        注意 如果元数据表中没有数据,则自动抽样将采集不到数据。
      • 上传本地数据:自动抽样时元数据采集不到或数据抽样的逻辑比较严格,例如从100万条数据中抽取其中1条数据,这样采集效率就很低,可以选择手动上传本地数据。

        上传本地数据前需要先下载样例,根据下载的样例编辑需要上传的数据,单击上传后,数据自动填充至元数据采样区域。

        ggaga
      • 手动输入:适用于采集的数据比较少,或者需要修改已采集到的数据的场景。
    3. 完成所有数据表的元数据采样后,单击页面下方的确定
    4. Result页面,查看调试数据、中间结果和调试结果。
      gagag

步骤三:配置Flink SQL任务

  1. 任务配置页面,确认在实时模式页签。
    实时模式页签,为您展示任务运行所使用的资源队列、引擎版本、资源配置类型及自动调优。gagag资源队列引擎版本默认为创建Flink SQL任务时选择的配置。同时,您也可以修改资源队列和引擎版本。
    1. 选中自定义配置资源配置类型,并单击去配置
    2. 在资源配置页面,为您展示一张拓扑图,图中每个方框代表了一个计算任务,都可以进行独立配置。每个Group代表着Group内部的节点可以存放在一台机器进行计算,可以有效避免数据的网络传播,提升性能。图中当前的资源配置就是默认为您展示系统推荐的资源配置。
      gagag
    3. 单击需要配置资源的Group右上角的①后,在自定义配置Group执行参数对话框配置参数后,单击确定
      fdagag
      参数 描述
      core 通常,core配置成0.25。即一个CPU可以支持4个线程同时运算,因此core的取值不能超过1,0.25即可,0.125是最低下限了,同时支持8个线程。
      heap_memory 单位为MB,heap_memory是堆内存,供Java应用程序使用的内存。

      heap_memory及其内部各组成的大小可以通过JVM的一系列命令行参数来控制,在一般的blink程序中,都会需要一定的heap_memory开销,,例如申请一定的heap_memory作为程序的缓存等,因此您可以按程序的规模来设置其大小。

      parallel 表示同时并发的线程数量,用户可以选择合适的数量来运行自己的任务,并不是越大越好,越大代表你资源申请的越多,反而对性能有抑制。
      通常,一个简单计算节点每秒可以处理2000~4000条之间的数据。
      注意 如果源头是tt,tt的queue大小决定了parallel的上限,不能超过这个数字,否则程序将报错。
      direct_memory 单位为MB,direct_memory并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域。但是这部分内存也被频繁的使用,而且也可能导致OutOfMemoryError异常出现。如果您的程序有使用igraph或者swift,可以适当配置其大小,如16-32MB。

      在Java NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O方式,direct_memory可以使用Native函数库直接分配堆外内存,然后同一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作。这样能在一场场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。

      native_memory 单位为MB,native_memory没有相应的参数来控制大小,其大小依赖于操作系统进程的最大值(对于32位系统就是3~4G,各种系统的实现并不一样),以及生成的Java字节码大小、创建的线程数量、维持Java对象的状态信息大小(用于GC)以及一些第三方的包。
      native memory存放下面4种信息:
      • 管理Java heap的状态数据(用于GC)。
      • JNI调用,也就是Native Stack。
      • JIT(即使编译器)编译时使用native memory,并且JIT的输入(Java字节码)和输出(可执行代码)也都是保存在native memory。
      • NIO direct buffer。
    4. 按照下图指引,进入自定义配置Operator执行参数对话框并配置参数。完成参数配置后单击确定
      gagaga自定义配置Operator执行参数对话框中core、heap_memory、parallel、direct_memory和native_memory参数解释请参见上一步骤,下表仅对state_size、chain_strategy参数进行解释。
      参数 描述
      state_size 保持默认0即可。
      chain_strategy chain_strategy用于定义多个节点的链接策略,Dataphin支持的链接策略包括:
      • Always:默认参数,即和其他节点均部署在一台机器上,没有特殊要求。
      • Never:节点不会和其他节点放在一台机器上,即需要独立部署。
      • Head:可以接受和其他节点放在一台机器上,但是只能作为Group的头节点。
      注意 目前,Head和Never极少出现,默认Always即可。
    5. 配置完成后,单击页面右上方的保存
      注意 如果您想继续使用系统推荐的资源配置,则单击页面右上方的重置为系统初始
  2. 可选:保存资源配置记录。
    注意 仅实时模式支持保存资源配置记录。

    如果资源配置类型自定义配置,则单击资源信息记录后的保存当前配置为新纪录。在保存资源记录对话框中,输入资源记录名称后,单击确定

    对已有的资源信息记录,您可以执行以下操作。
    操作 描述
    查看版本信息 单击某个记录操作列下的tesga图标,查看版本信息。
    启用记录
    1. 单击某个记录的操作列下的taga图标。
    2. 提示对话框中,单击确定
    删除记录
    1. 单击某个记录操作列下的teag图标。
    2. 提示对话框中,单击确定
  3. 可选:打开自动调优开关,配置最大CU数期望最大内存,开启自动调优。
    注意 仅实时模式支持自动调优功能。
  4. 可选:配置实时任务的时间参数。
    实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。

    为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。

    fagag
  5. 配置Flink SQL任务的依赖关系。
    任务配置面板的依赖关系区域,配置依赖关系。fa'ga'gfagag
    参数 描述
    自动解析 当节点的任务类型为SQL时,您可以单击自动解析,系统会解析代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。
    如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为onedata_dev
    • 如果代码里指定select * from s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from ${onedata}.s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from onedata.s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from onedata_dev.s_order,则调度解析依赖为onedata_dev.s_order
    上游依赖 通过执行如下操作,添加该节点任务调度时依赖的上游节点:
    1. 单击手动添加上游
    2. 新建上游依赖对话框中,您可以通过以下两种方式搜索依赖节点:
      • 输入所依赖节点的输出名称的关键字进行搜索节点。
      • 输入virtual搜索虚拟节点(每个租户或企业在初始化时都会有一个根节点)。
      说明 节点的输出名称是全局唯一的,且不区分大小写。
    3. 单击确定新增
    同时您还可以单击操作列下的fagaga图标,删除已添加的依赖节点。
    当前节点 通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用:
    1. 单击手动添加输出
    2. 新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为生成项目名.表名且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。

      例如,开发项目名称为onedata_dev,建议将输出名称设置为onedata.s_order。如果您将输出名称设置为onedata_dev.s_order,则仅限代码select * from onedata_dev.s_order能解析出上游依赖节点。

    3. 单击确定新增
    同时您还可以对当前节点已添加的输出名称执行如下操作:
    • 单击操作列下的fagaga图标,删除已添加的输出名称。
    • 如果该节点已提交或已发布,且被任务所依赖(任务已提交),则单击操作列下的图标,查看下游节点。
  6. 可选:配置Flink SQL任务的调度参数
    1. 任务配置面板的离线模式页签的调度配置区域,配置任务的调度参数。
      gagaga
      参数 描述
      时间属性 选择时间属性时间属性包括:
      • 正常调度:按照调度周期的时间配置调度,并正常执行,通常任务默认选中该项。
      • 空跑调度:按照调度周期的时间配置调度,但都是空跑执行,即一调度到该任务便直接返回成功,没有真正的执行任务。
      暂停调度 暂停调度选择后,即可暂停该任务的调度,会按照下面的调度周期时间配置调度,但是一旦调度到该任务会直接返回失败,不会执行。通常用于某个任务暂时不用执行,但后面还会继续使用的场景。
      调度周期 调度周期可选择小时分钟
      • 调度,即调度任务每天自动运行一次。新建周期任务时,系统默认的时间周期为每天0点运行一次。您可以根据需要,单击图标,指定运行的时间点。
      • 调度,即调度任务每周的特定几天,在特定时间点自动运行一次。您可以根据需要,单击图标,指定运行的时间点。

        如果您没有指定日期,为保证下游实例正常运行,系统会生成实例后直接设置为运行成功,而不会真正执行任何逻辑,也不会占用资源。

      • 调度,即调度任务在每月的特定几天,在特定时间点自动运行一次。您可以根据需要,单击图标,指定运行的时间点。

        如果在没有被指定的日期时,为保证下游实例正常运行,系统会每天生成实例后直接设置为运行成功,而不会真正执行任何逻辑,也不会占用资源。

      • 小时调度,即每天指定的时间段内,调度任务按间隔时间数的时间间隔运行一次。或选择指定的时间点,调度系统会自动为任务生成实例并运行。您可以根据业务需求选中时间段时间点
        • 如果您选中了时间段,您可以单击开始结束后的图标,指定运行的开始和结束时间。同时您可以单击间隔后的test图标,在下拉列表中选择间隔时间。
        • 如果您选中了时间点,单击下拉列表框,在下拉列表中选择时间点。

        例如,每天00:00~23:59的时间段内,每隔1小时会自动调度一次,因此调度系统会自动为任务生成实例并运行。

      • 分钟调度,即每天指定的时间段内,调度任务按间隔时间数的时间间隔运行一次。 您可以单击开始结束后的图标,指定运行的开始和结束时间。同时您可以单击间隔后的test图标,在下拉列表中选择间隔时间。
      依赖上周期

      根据业务场景选择本周期节点的运行,是否需要依赖上一周期本节点或其他节点的运行结果。

      选择节点类型。系统支持选择自定义当前。适用场景说明如下:
      • 本周期节点是否运行取决于上一周期本节点是否正常产出数据,则需要选择当前。只有上一周期本节点运行成功,才会启动运行本节点。
      • 代码任务没有用到某个节点的产出表,但业务上需要依赖该节点的上一周期是否正常产出数据,则需要选择依赖自定义节点。
      优先级 优先级定义了同一时间同一批待调度任务的优先级。优先级包括:
      • 最低优先级
      • 低优先级
      • 中等优先级
      • 高优先级
      • 最高优先级
      参数配置 您可以对代码中所用参数的具体赋值。单击节点参数配置说明,查看Dataphin调度系统的配置规则及支持配置的时间参数。
    2. 单击确定
  7. 可选:配置Flink SQL任务的参数。
    任务配置面板,任务参数配置区域,通常需要配置批任务的bizdate和流批任务的stat_date参数。faga
    • 流任务:定义格式为stat_date>='xxxx'
    • 批任务:定义格式为stat_date='xxxx'set project.table.batch.`partition`='ds=${bizdate}*';

步骤四:提交Flink SQL任务

  1. 在Flink SQL任务代码开发页面,单击页面右上方的test图标,保存Flink SQL任务。
  2. 在Flink SQL任务代码开发页面,单击页面右上方的taga图标,提交Flink SQL任务。
  3. 提交备注对话框,填写备注信息。
  4. 单击确定并提交,完成Flink SQL任务的提交。
  5. 可选:发布Flink SQL任务至生产环境。
    • 如果您的开发模式是Dev-Prod模式,则需要发布实时计算任务,详情请参见管理发布任务
    • 如果您的开发模式是Basic模式,则提交成功的实时计算任务,即可进入生产环境。

后续步骤

在运维中心查看并运维Flink SQL任务,保证任务的正常运行。具体操作,请参见实时任务实时实例