MaxCompute的非结构化框架支持您通过INSERT方式将MaxCompute的数据通过关联的OSS外部表直接输出到OSS。本文为您介绍如何将数据输出到OSS。
前提条件
在将数据输出到OSS前,请您确认已完成以下操作:
背景信息
- 通过内置Extractor将数据输出到OSS
使用MaxCompute内置的Extractor时,您需要通过建表语句创建OSS外部表,并通过INSERT操作非常方便地按照约定格式(TSV或CSV)将数据输出到OSS进行存储。
MaxCompute内置的Extractor包含如下两种:com.aliyun.odps.CsvStorageHandler
:定义如何读写CSV格式的数据。数据各列以英文逗号(,)为分隔符,换行符为\n
。com.aliyun.odps.TsvStorageHandler
:定义如何读写TSV格式的数据。数据各列以\t
为分隔符,换行符为\n
。
- 通过自定义StorageHandler输出到OSS
MaxCompute非结构化框架还提供了通用的SDK自定义Extractor,支持对外输出自定义数据格式文件。
与内置Extractor一样,您需要先创建OSS外部表,再通过对外部表的INSERT操作实现将数据输出到OSS。不同点在于创建外部表时,stored by
需要指定自定义的Extractor。说明 MaxCompute非结构化框架通过StorageHandler接口,来描述对各种数据存储格式的处理。StorageHandler作为一个wrapper class,允许您指定自定义的Extractor(用于数据的读入、解析、处理等)和Outputer(用于数据的处理和输出等)。自定义的StorageHandler应该继承OdpsStorageHandler,实现getExtractorClass和getOutputerClass接口。
更多创建OSS外部表语法信息,请参见创建外部表语法格式。
使用限制
执行INSERT操作将数据输出至OSS时,单个文件的大小不能超过5 GB。
通过内置Extractor将数据输出到OSS
- OSS内网域名:
oss-cn-hangzhou-internal.aliyuncs.com
,即华东1(杭州)。 - 非分区表数据存储路径:
oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/
- 分区表数据存储路径:
oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/
操作流程如下:
- 登录MaxCompute客户端,创建OSS外部表。
命令示例如下。
- 创建非分区表
create external table if not exists mc_oss_csv_external4 ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole' ) location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/';
- 创建分区表
create external table if not exists mc_oss_csv_external5 ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string ) partitioned by ( direction string ) stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ( 'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole' ) location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/';
说明 如果with serdeproperties
中未设置odps.properties.rolearn属性,且授权方式是采用明文AccessKey,则location
格式如下。location 'oss://<accessKeyId>:<accessKeySecret>@<oss_endpoint>/<Bucket名称>/<目录名称>/'
- 创建非分区表
- 通过MaxCompute客户端对外部表执行
insert overwrite
或insert into
命令,将数据输出到OSS。更多INSERT操作信息,请参见插入或覆写数据(INSERT INTO | INSERT OVERWRITE)或插入或覆写动态分区数据(DYNAMIC PARTITION)。
命令示例如下。- 将非分区表数据输出到OSS
insert into table mc_oss_csv_external4 select * from mc_oss_csv_external1;
mc_oss_csv_external1的数据信息,请参见内置Extractor访问OSS。
执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下产生了一个.odps文件夹,包含.csv与.meta文件。
- 将分区表数据输出到OSS
insert into table mc_oss_csv_external5 partition (direction) select * from mc_oss_csv_external2;
mc_oss_csv_external2的数据信息,请参见内置Extractor访问OSS。
执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下根据INSERT语句指定的分区值生成对应的分区子目录。分区子目录中包括.odps文件夹。例如
output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv
。
说明- .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
- 对于MaxCompute内置的TSV或CSV Extractor,处理产生的文件数目与对应SQL的并发度相同。
- 如果
insert overwrite ... select ... from ...;
操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。 - 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整
odps.stage.mapper.split.size
的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.num
和odps.stage.joiner.num
来调整。
- 将非分区表数据输出到OSS
通过自定义StorageHandler输出到OSS
操作流程如下:
- 通过IDEA工具定义一个Outputer类。
输出逻辑都必须实现Outputer接口。
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.data.Record; import com.aliyun.odps.io.OutputStreamSet; import com.aliyun.odps.io.SinkOutputStream; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.Outputer; import java.io.IOException; public class TextOutputer extends Outputer { private SinkOutputStream outputStream; private DataAttributes attributes; private String delimiter; public TextOutputer (){ // default delimiter, this can be overwritten if a delimiter is provided through the attributes. this.delimiter = "|"; } @Override public void output(Record record) throws IOException { this.outputStream.write(recordToString(record).getBytes()); } // no particular usage of execution context in this example @Override public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException { this.outputStream = outputStreamSet.next(); this.attributes = attributes; this.delimiter = this.attributes.getValueByKey("delimiter"); if ( this.delimiter == null) { this.delimiter=","; } System.out.println("Extractor using delimiter [" + this.delimiter + "]."); } @Override public void close() { // no-op } private String recordToString(Record record){ StringBuilder sb = new StringBuilder(); for (int i = 0; i < record.getColumnCount(); i++) { if (null == record.get(i)){ sb.append("NULL"); } else{ sb.append(record.get(i).toString()); } if (i != record.getColumnCount() - 1){ sb.append(this.delimiter); } } sb.append("\n"); return sb.toString(); } }
Outputer接口有三个:setup、output和close,与Extractor的setup、extract和close三个接口基本上对称。其中:
setup()
和close()
在一个Outputer中只会调用一次,可以在setup()
里做初始化准备工作。通常,您需要把setup()
传递进来的这三个参数保存成Outputer的class variable
,方便在之后output()
或close()
接口中使用。而close()
接口用于代码的扫尾工作。通常,数据处理发生在output(Record)
接口内。MaxCompute根据当前Outputer分配处理的每个输入Record调用一次output(Record)
。假设,在一个output(Record)
调用返回的时候,代码已经消费完该Record,则在当前output(Record)
返回后,系统会将Record所使用的内存另作他用。因此,当Record中的信息在跨多个output()
函数调用时,需要调用当前处理的record.clone()
方法,将当前Record保存下来。说明 使用外部表自定义Extractor实现Outputer接口时,Outputer.output(Record record)
中传入的Record是Outputer的上一个操作产生的记录,这个列名发生了变化。这些列名不保证固定,例如表达式some_function(column_a)
产生的列名是一个临时列名。因此,当您使用
record.get(列名)
方式来获取列的内容时需特别注意,建议改用record.get(index)
方式获取。 - 通过IDEA工具定义一个Extractor类。
Extractor用于数据的读入、解析、处理等,如果输出的表最终不需要再通过MaxCompute进行读取等,无需定义Extractor。
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.Column; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import com.aliyun.odps.io.InputStreamSet; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.Extractor; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.) **/ public class TextExtractor extends Extractor { private InputStreamSet inputs; private String columnDelimiter; private DataAttributes attributes; private BufferedReader currentReader; private boolean firstRead = true; public TextExtractor() { // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes) this.columnDelimiter = ","; } // no particular usage for execution context in this example @Override public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) { this.inputs = inputs; this.attributes = attributes; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); if ( columnDelimiter != null) { this.columnDelimiter = columnDelimiter; } System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "]."); // note: more properties can be inited from attributes if needed } @Override public Record extract() throws IOException { String line = readNextLine(); if (line == null) { return null; } return textLineToRecord(line); } @Override public void close(){ // no-op } private Record textLineToRecord(String line) throws IllegalArgumentException { Column[] outputColumns = this.attributes.getRecordColumns(); ArrayRecord record = new ArrayRecord(outputColumns); if (this.attributes.getRecordColumns().length != 0){ // string copies are needed, not the most efficient one, but suffice as an example here String[] parts = line.split(columnDelimiter); int[] outputIndexes = this.attributes.getNeededIndexes(); if (outputIndexes == null){ throw new IllegalArgumentException("No outputIndexes supplied."); } if (outputIndexes.length != outputColumns.length){ throw new IllegalArgumentException("Mismatched output schema: Expecting " + outputColumns.length + " columns but get " + parts.length); } int index = 0; for(int i = 0; i < parts.length; i++){ // only parse data in columns indexed by output indexes if (index < outputIndexes.length && i == outputIndexes[index]){ switch (outputColumns[index].getType()) { case STRING: record.setString(index, parts[i]); break; case BIGINT: record.setBigint(index, Long.parseLong(parts[i])); break; case BOOLEAN: record.setBoolean(index, Boolean.parseBoolean(parts[i])); break; case DOUBLE: record.setDouble(index, Double.parseDouble(parts[i])); break; case DATETIME: case DECIMAL: case ARRAY: case MAP: default: throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now."); } index++; } } } return record; } /** * Read next line from underlying input streams. * @return The next line as String object. If all of the contents of input * streams has been read, return null. */ private String readNextLine() throws IOException { if (firstRead) { firstRead = false; // the first read, initialize things currentReader = moveToNextStream(); if (currentReader == null) { // empty input stream set return null; } } while (currentReader != null) { String line = currentReader.readLine(); if (line != null) { return line; } currentReader = moveToNextStream(); } return null; } private BufferedReader moveToNextStream() throws IOException { InputStream stream = inputs.next(); if (stream == null) { return null; } else { return new BufferedReader(new InputStreamReader(stream)); } } }
- 通过IDEA工具定义一个StorageHandler类。
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.udf.Extractor; import com.aliyun.odps.udf.OdpsStorageHandler; import com.aliyun.odps.udf.Outputer; public class TextStorageHandler extends OdpsStorageHandler { @Override public Class<? extends Extractor> getExtractorClass() { return TextExtractor.class; } @Override public Class<? extends Outputer> getOutputerClass() { return TextOutputer.class; } }
若表无需读取,无需指定Extractor接口。
- 通过IDEA工具编译上述自定义类代码并打包为JAR包,通过MaxCompute客户端添加为MaxCompute资源。
添加MaxCompute资源命令示例如下。
add jar odps-TextStorageHandler.jar;
- 通过MaxCompute客户端创建外部表。
与使用内置Extractor类似,需要创建OSS外部表,不同的是指定数据输出到OSS外部表时,使用自定义的Extractor。
- 创建非分区表
create external table if not exists output_data_txt_external1 ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler' with serdeproperties( 'delimiter'='|' [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole']) location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/' using 'odps-TextStorageHandler.jar';
- 创建分区表
create external table if not exists output_data_txt_external2 ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string ) partitioned by ( direction string ) stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler' with serdeproperties( 'delimiter'='|' [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole']) location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/' using 'odps-TextStorageHandler.jar';
- 创建非分区表
- 通过MaxCompute客户端对外部表执行
insert overwrite
或insert into
命令,将数据输出到OSS。更多INSERT操作信息,请参见插入或覆写数据(INSERT INTO | INSERT OVERWRITE)或插入或覆写动态分区数据(DYNAMIC PARTITION)。
命令示例如下。- 将非分区表数据输出到OSS
insert into table output_data_txt_external1 select * from mc_oss_csv_external1;
mc_oss_csv_external1的数据信息,请参见内置Extractor访问OSS。
执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下产生了一个.odps文件夹,包含.csv与.meta文件。
- 将分区表数据输出到OSS
insert into table output_data_txt_external2 partition (direction) select * from mc_oss_csv_external2;
mc_oss_csv_external2的数据信息,请参见内置Extractor访问OSS。
执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下根据INSERT语句指定的分区值生成对应的分区子目录。分区子目录中包括.odps文件夹。例如
output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv
。
- 将非分区表数据输出到OSS
通过OSS分片上传功能输出到OSS
您还可以使用OSS的分片上传(Multipart Upload)功能通过INSERT操作向OSS外部表写入数据。该功能需要在Session级别或Project级别设置odps.sql.unstructured.oss.commit.mode
属性进行开启或关闭。默认关闭。更多分片上传功能信息,请参见分片上传。
stored as xxx
的外部表,更多stored as xxx
信息,请参见支持开源格式数据。不支持自定义StorageHandler、Extractor以及SQL查询中携带UDF的场景。
在使用该功能前,您需要确认MaxCompute项目已启用jobconf2来生成作业执行计划,即odps.sql.jobconf.odps2
属性值为True。
odps.sql.jobconf.odps2
属性默认值为True,如果取值非True,请在Session级别执行set odps.sql.jobconf.odps2=true;
命令进行设置。
odps.sql.unstructured.oss.commit.mode
属性设置不同取值时的实现原理如下:
- 取值为False:MaxCompute写入到OSS外部表的数据,会存储在LOCATION目录下的.odps文件夹中。.odps文件夹中维护了一个.meta文件,用于保证MaxCompute数据的一致性。.odps的内容只有MaxCompute能正确处理,在其他数据处理引擎中可能无法正确解析,会导致报错。
- 取值为True:MaxCompute使用分片上传功能,以
two-phase commit
的方式保证数据的一致性,同时也不会有.odps目录以及.meta文件。兼容其他数据处理引擎。
基于数据开放的需求,通过SQL语句写入OSS的数据是需要能够被其他引擎使用的。您可以设置odps.sql.unstructured.oss.commit.mode
值为False,保留当前OSS外部表有.odps
目录的机制;另外设置odps.sql.unstructured.oss.commit.mode
值为True,启用不包含.odps
目录的行为,兼容开源引擎。
在文档使用中是否遇到以下问题
更多建议
匿名提交