当OSS中的数据格式比较复杂,内置的Extractor无法满足需求时,您可以自定义Extractor来读取OSS上的非结构化数据(文本数据和非文本数据)。本文为您介绍如何自定义Extractor,并基于自定义Extractor来创建OSS外部表,实现访问存储在OSS上的数据。

前提条件

在访问OSS非结构化数据前,请您确认已完成以下操作:
  • OSS授权。

    更多授权操作信息,请参见STS模式授权

  • 已准备好OSS存储空间(Bucket)、OSS目录及OSS数据文件。

    更多OSS存储空间操作信息,请参见创建存储空间

    更多新建OSS目录操作信息,请参见创建目录

    更多上传数据文件操作信息,请参见上传文件

使用限制

注意事项

使用OSS外部表时,您需要注意:

  • 如果您创建的OSS外部表为分区表,分区数据在OSS上的存放路径必须符合一定的格式要求。更多格式要求,请参见OSS外部表的数据分区
  • OSS外部表只是在系统中记录与OSS目录的关联关系。当删除外部表时,不会删除对应OSS目录下的数据。
  • 如果OSS上的数据文件类型为归档文件,需要先解冻文件。更多解冻操作,请参见解冻文件

创建外部表语法格式

create external table [if not exists] <mc_oss_extable_name>
(
<col_name> <date_type>,
...
)
[partitioned by (<col_name> <data_type>, ...)]
--指定自定义的Extractor。
stored by '<StorageHandler>' 
--指定外部表相关参数。
with serdeproperties (
 'delimiter'='<delimiter>',  
 'odps.properties.rolearn'='<ram_arn>'
--OSS文件为GZIP压缩格式时需要配置。
 [,'odps.text.option.gzip.input.enabled'='true']
--其他OSS外部表相关属性,根据实际需要配置。
 [,'<property_name>'='<property_value>'[,'<property_name>'='<property_value>'...]]
) 
location '<oss_location>'
using '<jar_name>';
  • if not exists:可选。如果不指定if not exists选项而存在同名表,会报错。如果指定if not exists,无论是否存在同名表,即使原表结构与要创建的目标表结构不一致,均返回成功。已存在的同名表的元数据信息不会被改动。
  • mc_oss_extable_name:必填。待创建的OSS外部表的名称。
  • col_name:必填。OSS外部表的列名称。
  • data_type:必填。OSS外部表的列的数据类型。
  • partitioned by (<col_name> <data_type>, ...):可选。指定OSS外部表为分区表时的分区信息。
    • col_name:必填。分区列的名称。
    • data_type:必填。分区列的数据类型。
  • StorageHandler:必填。指定自定义的StorageHandler的类名称。
  • odps.properties.rolearn'='<ram_arn>:必填。指定RAM中AliyunODPSDefaultRole的ARN信息。您可以通过RAM控制台中的角色详情获取。
  • location:必填。指定数据文件的OSS路径。OSS目录格式为oss://<oss_endpoint>/<Bucket名称>/<目录名称>/。系统会默认读取该目录下的所有文件。
    • oss_endpoint:OSS访问域名信息。建议您使用OSS提供的内网域名,否则将产生OSS流量费用。更多OSS内网域名信息,请参见访问域名和数据中心。建议数据存放的OSS区域与MaxCompute项目所在区域保持一致。由于MaxCompute只在部分区域部署,跨区域的数据连通性可能存在问题。
    • Bucket名称:OSS存储空间名称,即Bucket名称。查看存储空间名称操作,请参见列举存储空间
    • 目录名称:指定OSS目录名称。目录后不需要指定文件名,错误用法如下:
      http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/                  -- 不支持HTTP连接。
      https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/                 -- 不支持HTTPS连接。
      oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo                    -- 连接地址错误。
      oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv  -- 不需要指定文件名。
  • 'odps.text.option.gzip.input.enabled'='true':当OSS数据文件为GZIP压缩格式时,必须配置。
  • <property_name>'='<property_value>':可选。property_name为属性名称,property_value为属性值。更多属性信息,请参见内置Extractor访问OSS
  • USING:指定Extractor类定义所在的JAR包。

自定义Extractor访问OSS上的文本数据

以文本数据文件为例,记录之间的列通过|分隔。存储路径为/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv,数据内容如下。
1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N

操作流程如下:

  1. 通过IDEA工具定义一个Extractor类。
    编写一个通用的Extractor,将分隔符作为参数传入,可以处理所有类似格式的TEXT文件。代码示例如下。
    /**
     * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
     **/
    public class TextExtractor extends Extractor {
      private InputStreamSet inputs;
      private String columnDelimiter;
      private DataAttributes attributes;
      private BufferedReader currentReader;
      private boolean firstRead = true;
      public TextExtractor() {
        // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
        this.columnDelimiter = ",";
      }
      // no particular usage for execution context in this example
      @Override
      public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
        this.inputs = inputs; //inputs是一个InputStreamSet,每次调用next()返回一个InputStream,这个InputStream可以读取一个OSS文件的所有内容。
        this.attributes = attributes;
        // check if "delimiter" attribute is supplied via SQL query
        String columnDelimiter = this.attributes.getValueByKey("delimiter"); //delimiter通过DDL语句传参。
        if ( columnDelimiter != null)
        {
          this.columnDelimiter = columnDelimiter;
        }
        // note: more properties can be inited from attributes if needed
      }
      @Override
      public Record extract() throws IOException {//extactor()调用返回一条Record,代表外部表中的一条记录。
        String line = readNextLine();
        if (line == null) {
          return null; // 返回NULL来表示这个表中已经没有记录可读。
        }
        return textLineToRecord(line); // textLineToRecord将一行数据按照delimiter分割为多个列。
      }
      @Override
      public void close(){
        // no-op
      }
    }
    说明 更多使用TextLineToRecord将数据分割的完整实现,请参见TextExtractor.java
  2. 通过IDEA工具定义一个StorageHandler类。
    StorageHandler是外部表自定义逻辑的统一入口。代码示例如下。
    package com.aliyun.odps.udf.example.text;
    public class TextStorageHandler extends OdpsStorageHandler {
      @Override
      public Class<? extends Extractor> getExtractorClass() {
        return TextExtractor.class;
      }
      @Override
      public Class<? extends Outputer> getOutputerClass() {
        return TextOutputer.class;
      }
    }
  3. 通过IDEA工具编译上述自定义类代码并打包为JAR包,通过MaxCompute客户端添加为MaxCompute资源。
    添加MaxCompute资源命令示例如下。
    add jar odps-udf-example.jar;
  4. 通过MaxCompute客户端创建外部表。
    命令示例如下,其中delimeter是OSS数据的分隔符名称。
    create external table if not exists ambulance_data_txt_external
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler' 
      with serdeproperties (
    'delimiter'='\\|',  
    'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
    )
    location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
    using 'odps-udf-example.jar'; 
  5. 通过MaxCompute客户端执行如下SQL语句查询外部表。
    select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;

自定义Extractor访问OSS上的非文本数据

以语音数据(WAV格式文件)为例,为您介绍如何通过自定义的Extractor访问并处理OSS上的非文本数据文件。

操作流程如下:

  1. 自定义Extractor、StorageHandler类并编译、打包为JAR包。更多逻辑实现,请参见SpeechSentenceSnrExtractor
  2. 通过MaxCompute客户端执行如下SQL语句创建外部表。
    create external table if not exists speech_sentence_snr_external
    (
    sentence_snr double,
    id string
    )
    stored by 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
    with serdeproperties (
        'mlfFileName'='sm_random_5_utterance.text.label' ,
        'speechSampleRateInKHz' = '16'
    )
    location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
    using 'odps-udf-example.jar,sm_random_5_utterance.text.label';
    • com.aliyun.odps.udf.example.speech.SpeechStorageHandler:封装的Extractor可计算语音文件的平均有效语句信噪比,并将抽取出来的结构化数据直接进行SQL运算。
    • locationoss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/存储了多个原始的WAV格式的语音文件,MaxCompute框架会读取该地址上的所有文件,并按照文件级别分片,自动将文件分配给多个计算节点处理。
  3. 通过MaxCompute客户端执行如下SQL语句查询结果。
    select sentence_snr, id from speech_sentence_snr_external where sentence_snr > 10.0; 
    返回结果如下。
    --------------------------------------------------------------
    | sentence_snr |                     id                      |
    --------------------------------------------------------------
    |   34.4703    |          J310209090013_H02_K03_042          |
    --------------------------------------------------------------
    |   31.3905    | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
    --------------------------------------------------------------
    |   35.4774    | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0  |
    --------------------------------------------------------------
    |   16.0462    | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0  |
    --------------------------------------------------------------
    |   14.5568    |   tsh_148_3013_5_13_47_3d5008d792408f81_0   |
    --------------------------------------------------------------

通过自定义Extractor,可以在SQL语句上分布式地处理多个OSS上的语音数据文件。您可以使用同样的方法利用MaxCompute的大规模计算能力,处理图像、视频等各种类型的非结构化数据。