通过DataWorks控制台,您可以在MaxCompute中使用merge_udf.jar包将表格存储的增量数据转换为全量数据格式。

前提条件

步骤一:新建JAR资源

通过新建JAR资源,将下载的merge_udf.jar包上传到MaxCompute中。

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 新建JAR资源。
    1. 数据开发页面,将鼠标悬停在新建图标,选择MaxCompute > 资源 > JAR
      注意 只有在工作空间配置页面添加MaxCompute引擎后,当前页面才会显示MaxCompute节点。具体操作,请参见配置工作空间

      您也可以打开相应的业务流程,右键单击MaxCompute,选择新建 > 资源 > JAR

      如果您需要创建业务流程,请参见创建业务流程

    2. 新建资源对话框,填写资源名称,并选择目标文件夹
      说明
      • 资源名称无需与上传的文件名保持一致,JAR资源的后缀必须为.jar
      • 如果该JAR包已经在MaxCompute(ODPS)客户端上传过,则需要取消选择上传为ODPS资源,否则上传会报错。
    3. 单击点击上传,选择相应的文件进行上传。
    4. 单击新建
    fig_jar_source
  3. 在资源编辑页面,提交资源到调度开发服务器端。
    1. 单击工具栏中的提交图标。
    2. 提交新版本对话框,输入变更描述。
    3. 单击确认

步骤二:新建并注册函数

  1. 新建函数。
    1. 数据开发页面,将鼠标悬停在新建图标,选择MaxCompute > 函数
      您也可以打开相应的业务流程,右键单击MaxCompute,选择新建 > 函数
    2. 新建函数对话框,输入函数名称,并选择目标文件夹
    3. 单击新建
    fig_fc
  2. 注册函数。
    1. 注册函数页面,选择函数类型其他函数
    2. 填写资源列表步骤一:新建JAR资源中的资源名称,并根据表类型和模式填写对应的类名
      根据表类型不同可以选择的模式不同,单版本表只能选择单版本模式,多版本表可以选择多版本模式V1或者多版本模式V2。关于模式选择的更多信息,请参见附录:模式选择
      表类型 模式 类名
      单版本表 单版本模式 com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
      多版本表 多版本模式V1 com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
      多版本模式V2 com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
      fig_fcregis
  3. 在函数编辑页面,提交函数到调度开发服务器端。
    1. 单击工具栏中的提交图标。
    2. 提交新版本对话框,输入变更描述。
    3. 单击确认

步骤三:编写ODPS SQL并运行

  1. 新建ODPS SQL节点。
    1. 数据开发页面,将鼠标悬停在新建图标,选择Maxcompute > ODPS SQL
      您也可以打开相应的业务流程,右键单击MaxCompute,选择新建 > ODPS SQL
    2. 新建节点对话框,填写节点名称,并选择目标文件夹
    3. 单击提交
    fig_sql
  2. 在节点编辑页面,编写ODPS SQL语句并运行。
    ODPS SQL语句的语法如下:
    select function_name(para_list) 
     as (custom_para_list)
    from(
        select * from stream_table_name distribute by primary_keys sort by primary_keys,SequenceID
    )t;

    其中function_name步骤二:新建并注册函数中的函数名称,para_list附录:模式选择中的参数列表,custom_para_list为自定义参数列表,stream_table_name为增量表的名称,primary_keys为表格存储数据表的主键列表,SequenceID为增量表的sequenceid。

附录:模式选择

模式包括单版本模式、多版本模式V1和多版本模式V2,请根据表类型选择合适的模式。

  • 单版本模式
    • 类名:com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
    • 参数列表

      参数列表的格式为pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo。具体参数说明请参见下表,请根据实际填写pknum、colnum、colnames和pknames,其他参数保持参数名称即可。

      参数 类型 描述
      INT pknum 常量 表格存储数据表的主键个数。
      INT colnum 常量 表格存储数据表中的属性列个数。
      List<String> colnames 常量 要合并增量变更的属性列名称。
      List<String> pknames 变量 表格存储数据表的主键列表。
      STRING colname 变量 属性列。
      BIGINT version 变量 版本号。
      STRING colvalue 变量 增量值。
      STRING optype 变量 增量操作类型。
      STRING sequenceinfo 变量 自增保序sequenceid。
    • 示例
      当表格存储数据表的主键为pk1,pk2且属性列为col1,col2,col3时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo,如果要合并增量变更的属性列为col1,col2,col3,设置参数列表为2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo,ODPS SQL语句示例如下:
      select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) 
       as (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
      from(
          select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceInfo
      )t;
      执行ODPS SQL语句后的输出结果请参见下表。
      pk1 pk2 col1 co1_is_deleted col2 col2_is_deleted col3 col3_is_deleted
      test 0 \N \N 20 \N \N true
      输出结果的说明如下:
      • 当col列和col_is_deleted列均为\N时,表示col列无任何增量操作。
      • 当col列为具体的值且col_is_deleted列为\N时,表示col列的值被修改为对应值。
      • 当col列为\N且col_is_deleted列为true时,表示col列被删除。
  • 多版本模式V1
    • 类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
    • 参数列表

      参数列表的格式为pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo。具体参数说明请参见下表,请根据实际填写pknum、colnum、colnames和pknames,其他参数保持参数名称即可。

      参数 类型 描述
      INT pknum 常量 表格存储数据表的主键个数。
      INT colnum 常量 表格存储数据表中的属性列个数。
      List<String> colnames 常量 要合并增量变更的属性列名称。
      List<String> pknames 变量 表格存储数据表的主键列表。
      STRING colname 变量 属性列。
      BIGINT version 变量 版本号。
      STRING colvalue 变量 增量值。
      STRING optype 变量 增量操作类型。
      STRING sequenceinfo 变量 自增保序sequenceid。
    • 示例
      当表格存储数据表的主键为pk1,pk2且属性列为col1,col2,col3时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo,如果要合并增量变更的属性列为col1,col2,col3,设置参数列表为2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo,ODPS SQL语句示例如下:
      select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) 
       as (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
      from(
          select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo
      )t;
      执行ODPS SQL语句后的输出结果请参见下表。
      pk1 pk2 version col1 co1_is_deleted col2 col2_is_deleted col3 col3_is_deleted
      test 0 123 \N \N 20 \N \N true
      输出结果的说明如下:
      • 当version列为具体的值,且col列和col_is_deleted列均为\N时,表示col列对应版本没有任何增量操作。
      • 当version列和col列均为具体的值,且col_is_deleted列为\N时,表示col列对应版本的值被修改为具体的值。
      • 当version列为具体的值,col列为\N,且col_is_deleted列为true时,表示col列对应版本被删除。
      • 当version列和col列均为\N,且col_is_deleted列为true时,表示存在删除一列所有版本的操作。
  • 多版本模式V2
    • 类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
    • 参数列表

      参数列表的格式为pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo。具体参数说明请参见下表,请根据实际填写pknum、colnum、maxversion、colnames和pknames,其他参数保持参数名称即可。

      参数 类型 描述
      BIGINT pknum 常量 表格存储数据表的主键个数。
      BIGINT colnum 常量 表格存储数据表中的属性列个数。
      BIGINT maxversion 常量 最大版本数。
      List<String> colnames 常量 要合并增量变更的属性列名称。
      List<String> pknames 变量 表格存储数据表的主键列表。
      STRING colname 变量 属性列。
      BIGINT version 变量 版本号。
      STRING colvalue 变量 增量值。
      STRING optype 变量 增量操作类型。
      STRING sequenceinfo 变量 自增保序sequenceid。
    • 示例
      当表格存储数据表的主键为pk1,pk2,属性列为col1,col2,col3且最大版本数为3时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo,如果要合并增量变更的属性列为col1,col2,col3,设置参数列表为2,3,3,"col1","col2","col3",pk1,pk2,colName,version,colValue,opType,sequenceInfo,ODPS SQL语句示例如下:
      select mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,opyype,sequenceinfo) 
       as (pk1,pk2,col1,col2,col3)
      from(
          select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo
      )t;
      执行ODPS SQL语句后的输出结果请参见下表。
      pk1 pk2 col1 col2 col3
      test 02 {"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]} \N \N
      输出结果的说明如下:
      • data表示新写入的数据列表,按照版本号降序排序。最多保留maxversion个版本的数据。
      • needDeleteAllVersionFirst表示该列是否需要删除原有全部版本。当出现删除一行DeleteRow或删除一列的所有版本DeleteColumns时,该值为true,否则为false。
      • deleteVersions表示该列需要删除的版本列表,按照版本号降序排序。最多保留maxversion个版本。

        deleteVersions中的版本号不会与data中的版本号相同,当needDeleteAllVersionFirst为true时,deleteVersions为空列表。