本文为您介绍OSS Reader支持的数据类型、字段映射和数据源等参数及配置示例。

OSS Reader插件提供了读取OSS数据存储的能力。在底层实现上,OSS Reader使用OSS官方Java SDK获取OSS数据,并转换为数据同步传输协议传递给Writer。OSS Reader支持OSS中的BIGINT、DOUBLE、STRING、DATATIME和BOOLEAN数据类型。

OSS Reader实现了从OSS读取数据并转为数据集成协议的功能,OSS本身是无结构化数据存储。对于数据集成而言,OSS Reader支持的功能如下:
  • 支持且仅支持读取TXT格式的文件,且要求TXT中schema为一张二维表。
  • 支持类CSV格式文件,自定义分隔符。
  • 支持多种类型数据读取(使用String表示),支持列裁剪、列常量。
  • 支持递归读取、支持文件名过滤。
  • 支持文本压缩,现有压缩格式为gzipbzip2zip
    说明 一个压缩包不允许多文件打包压缩。
  • 多个Object可以支持并发读取。
OSS Reader暂时不能实现以下功能:
  • 单个Object(File)支持多线程并发读取。
  • 单个Object在压缩情况下,从技术上无法支持多线程并发读取。
  • 单个Object(File)超过100 GB。
参考文档如下:

支持的数据类型

类型分类 数据集成column配置类型 数据库数据类型
整数类 LONG LONG
字符串类 STRING STRING
浮点类 DOUBLE DOUBLE
布尔类 BOOLEAN BOOL
日期时间类 DATE DATE

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。
Object OSS的Object信息,此处可以支持填写多个Object。例如xxx的bucket中有yunshi文件夹,文件夹中有ll.txt文件,则Object直接填yunshi/ll.txt。
  • 当指定单个OSS Object时,OSS Reader暂时只能使用单线程进行数据抽取。后期将考虑在非压缩文件情况下针对单个Object可以进行多线程并发读取。
  • 当指定多个OSS Object时,OSS Reader支持使用多线程进行数据抽取。线程并发数通过通道数指定。
  • 当指定通配符时,OSS Reader尝试遍历出多个Object信息。例如配置 abc[0-9]表示abc0、abc1、abc2、abc3等。配置通配符会导致内存溢出,通常不建议您进行配置。详情请参见OSS产品概述
说明
  • 数据同步系统会将一个作业下同步的所有Object视作同一张数据表。您必须保证所有的Object能够适配同一套Schema信息。
  • 请注意控制单个目录下的文件个数,否则可能会触发系统OutOfMemoryError报错。如果遇到此情况,请将文件拆分到不同目录后再尝试进行同步。
column 读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不是从源头文件读取数据,而是根据value值自动生成对应的列。
默认情况下,您可以全部按照String类型读取数据,配置如下。
json
"column": ["*"]
您可以指定column字段信息,配置如下。
json
"column":
    {
       "type": "long",
       "index": 0    //从OSS文本第一列获取int字段。
    },
    {
       "type": "string",
       "value": "alibaba"  //从OSSReader内部生成alibaba的字符串字段作为当前字段。
    }
说明 对于您指定的column信息,type必须填写,index/value必须选择其一。
全部按照STRING类型读取。
fieldDelimiter 读取的字段分隔符。
说明 OSS Reader在读取数据时,需要指定字段分割符,如果不指定默认为(,),界面配置中也会默认填写为(,)。

如果分隔符不可见,请填写Unicode编码。例如,\u001b\u007c

,
compress 文本压缩类型,默认不填写(即不压缩)。支持压缩类型为gzipbzip2zip 不压缩
encoding 读取文件的编码配置。 utf-8
nullFormat 文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义哪些字符串可以表示为null。例如,您配置nullFormat="null",那么如果源头数据是"null",数据同步系统会视作null字段。针对空字符串,需要加一层转义:\N=\\N
skipHeader 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过,压缩文件模式下不支持skipHeader false
csvReaderConfig 读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    文件名(含路径) 即上述参数说明中的Object
    说明 假如您的OSS文件名有根据每天的时间命名的部分,例如aaa/20171024abc.txt,关于Object系统参数可以设置为aaa/${bdp.system.bizdate}abc.txt
    列分隔符 即上述参数说明中的fieldDelimiter,默认值为(,)。
    编码格式 即上述参数说明中的encoding,默认值为utf-8
    null值 即上述参数说明中的nullFormat,将要表示为空的字段填入文本框,如果源端存在则将对应的部分转换为空。
    压缩格式 即上述参数说明中的compress,默认值为不压缩。
    是否包含表头 即上述参数说明中的skipHeader,默认值为No
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应的关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

脚本配置样例如下所示,具体参数填写请参见参数说明。
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"oss",//插件名。
            "parameter":{
                "nullFormat":"",//定义可以表示为null的字符串。
                "compress":"",//文本压缩类型。
                "datasource":"",//数据源。
                "column":[//字段。
                    {
                        "index":0,//列序号。
                        "type":"string"//数据类型。
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", //时间格式。
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",//类CSV格式文件可能存在表头为标题情况,需要跳过。
                "encoding":"",//编码格式。
                "fieldDelimiter":",",//字段分隔符。
                "fileFormat": "",//文本类型。
                "object":[]//object前缀。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

ORC或Parquet文件读取OSS

目前通过复用HDFS Reader的方式完成OSS读取ORC或Parquet格式的文件,在OSS Reader已有参数的基础上,增加了PathFileFormat等扩展配置参数,参数含义请参见HDFS Reader

  • 以ORC文件格式读取OSS,示例如下。
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "orc",
            "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              }
            ]
          }
        }
  • 以Parquet文件格式读取OSS,示例如下。
    {
    "type": "job",
    "version": "2.0",
    "steps": [
    {
    "stepType": "oss",
    "parameter": {
    "nullFormat": "",
    "compress": "",
    "fileFormat": "parquet",
    "path": "/*",
    "parquetSchema": "message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }",
    "column": [
    {
    "index": "0",
    "type": "string"
    },
    {
    "index": "1",
    "type": "long"
    },
    {
    "index": "2",
    "type": "string"
    },
    {
    "index": "3",
    "type": "string"
    },
    {
    "index": "4",
    "type": "string"
    },
    {
    "index": "5",
    "type": "string"
    },
    {
    "index": "6",
    "type": "string"
    },
    {
    "index": "7",
    "type": "string"
    },
    {
    "index": "8",
    "type": "string"
    },
    {
    "index": "9",
    "type": "string"
    },
    {
    "index": "10",
    "type": "double"
    },
    {
    "index": "11",
    "type": "string"
    },
    {
    "index": "12",
    "type": "string"
    }
    ],
    "skipHeader": "false",
    "encoding": "UTF-8",
    "fieldDelimiter": ",",
    "fieldDelimiterOrigin": ",",
    "datasource": "wpw_demotest_oss",
    "envType": 0,
    "object": [
    "wpw_demo/userdata1.parquet"
    ]
    },
    "name": "Reader",
    "category": "reader"
    },
    {
    "stepType": "odps",
    "parameter": {
    "partition": "dt=${bizdate}",
    "truncate": true,
    "datasource": "0_odps_wpw_demotest",
    "envType": 0,
    "column": [
    "id"
    ],
    "emptyAsNull": false,
    "table": "wpw_0827"
    },
    "name": "Writer",
    "category": "writer"
    }
    ],
    "setting": {
    "errorLimit": {
    "record": ""
    },
    "locale": "zh_CN",
    "speed": {
    "throttle": false,
    "concurrent": 2
    }
    },
    "order": {
    "hops": [
    {
    "from": "Reader",
    "to": "Writer"
    }
    ]
    }
    }