本文为您介绍HDFS Writer支持的数据类型、字段映射和数据源等参数及配置示例。

HDFS Writer提供向HDFS文件系统指定路径中写入TextFile文件、 ORCFile文件以及ParquetFile格式文件,文件内容可以与Hive中的表关联。开始配置HDFS Writer插件前,请首先配置好数据源,详情请参见配置Hive数据源
说明

使用限制

  • 目前HDFS Writer仅支持TextFile、ORCFile和ParquetFile三种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
  • 由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入。
  • 目前不支持DECIMAL、BINARY、ARRAYS、MAPS、STRUCTS和UNION等Hive数据类型。
  • 对于Hive分区表目前仅支持一次写入单个分区。
  • 对于TextFile,需要保证写入HDFS文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入HDFS数据与Hive表字段关联。
  • 目前插件中的Hive版本为1.1.1,Hadoop版本为2.7.1(Apache为适配JDK1.7)。在Hadoop2.5.0、Hadoop2.6.0和Hive1.2.0测试环境中写入正常。

实现过程

HDFS Writer的实现过程如下所示:
  1. 根据您指定的path,创建一个HDFS文件系统上不存在的临时目录。

    创建规则:path_随机

  2. 将读取的文件写入这个临时目录。
  3. 全部写入后,将临时目录下的文件移动到您指定的目录(在创建文件时保证文件名不重复)。
  4. 删除临时目录。如果在此过程中,发生网络中断等情况造成无法与HDFS建立连接,需要您手动删除已经写入的文件和临时目录。
说明 数据同步需要使用Admin账号,并且有访问相应文件的读写权限。

数据类型转换

目前HDFS Writer支持大部分Hive类型,请注意检查您的数据类型。

HDFS Writer针对Hive数据类型的转换列表,如下所示。

说明 column的配置需要和Hive表对应的列类型保持一致。
类型分类 数据库数据类型
整数类 TINYINT、SMALLINT、INT和BIGINT
浮点类 FLOAT和DOUBLE
字符串类 CHAR、VARCHAR和STRING
布尔类 BOOLEAN
日期时间类 DATE和TIMESTAMP

参数说明

参数 描述 是否必选 默认值
defaultFS Hadoop HDFS文件系统namenode节点地址,例如hdfs://127.0.0.1:9000。公共资源组不支持Hadoop高级参数HA的配置,请新增自定义数据集成资源组
fileType 文件的类型,目前仅支持您配置为textorcparquet
  • text:表示TextFile文件格式。
  • orc:表示ORCFile文件格式。
  • parquet:表示普通parquet file文件格式。
path 存储到Hadoop HDFS文件系统的路径信息,HDFS Writer会根据并发配置在path目录下写入多个文件。

为了与Hive表关联,请填写Hive表在HDFS上的存储路径。例如Hive上设置的数据仓库的存储路径为/user/hive/warehouse/,已建立数据库test表hello,则对应的存储路径为/user/hive/warehouse/test.db/hello

fileName HDFS Writer写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
column 写入数据的字段,不支持对部分列写入。

为了与Hive中的表关联,需要指定表中所有字段名和字段类型,其中name指定字段名,type指定字段类型。

您可以指定column字段信息,配置如下。
"column": 
[
    {
        "name": "userName",
        "type": "string"
    },
    {
        "name": "age",
        "type": "long"
    }
]
是(如果filetype为parquet,此项无需填写)
writeMode HDFS Writer写入前数据清理处理模式:
  • append:写入前不做任何处理,数据集成HDFS Writer直接使用filename写入,并保证文件名不冲突。
  • nonConflict:如果目录下有fileName前缀的文件,直接报错。
  • truncate:写入前清理fileName名称前缀匹配的所有文件。例如,"fileName": "abc",将清理对应目录所有abc开头的文件。
说明 Parquet格式文件不支持Append,所以只能是noConflict
fieldDelimiter HDFS Writer写入时的字段分隔符,需要您保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据。 是(如果filetype为parquet,此项无需填写)
compress HDFS文件压缩类型,默认不填写,则表示没有压缩。

其中text类型文件支持gzip和bzip2压缩类型,orc类型文件支持SNAPPY压缩类型(需要您安装SnappyCodec)。

encoding 写文件的编码配置。 无压缩
parquetSchema 写Parquet格式文件时的必填项,用来描述目标文件的结构,所以此项当且仅当fileTypeparquet时生效,格式如下。
message MessageType名 {
是否必填, 数据类型, 列名;
......................;
}
配置项说明如下:
  • MessageType名:填写名称。
  • 是否必填:required表示非空,optional表示可为空。推荐全填optional。
  • 数据类型:Parquet文件支持BOOLEAN、INT32、INT64、INT96、FLOAT、DOUBLE、BINARY(如果是字符串类型,请填BINARY)和FIXED_LEN_BYTE_ARRAY等类型。
说明 每行列设置必须以分号结尾,最后一行也要写上分号。
示例如下。
message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}
hadoopConfig hadoopConfig中可以配置与Hadoop相关的一些高级参数,例如HA的配置。公共资源组不支持Hadoop高级参数HA的配置,请新增自定义数据集成资源组
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.youkuDfs.namenode1": "",
"dfs.namenode.rpc-address.youkuDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
dataxParquetMode 针对Parquet文件进行同步的模式。使用fields支持array、map和struct等复杂类型。可选值包括fields和columns。
配置dataxParquetMode为fields时,支持hdfs over oss,即HDFS的存储为OSS,OSS的数据存储格式为parquet。此时您可以在hadoopConfig中增加OSS相关的参数,详情如下:
  • fs.oss.accessKeyId:访问OSS的AccessKeyID。
  • fs.oss.accessKeySecret:访问OSS的AccessKeySecret。
  • fs.oss.endpoint:访问OSS的endpoint。
访问OSS的示例如下所示。
```json
    "writer": {
    "name": "hdfswriter",
    "parameter": {
        "defaultFS": "oss://test-bucket",
        "fileType": "parquet",
        "path": "/datasets/oss_demo/kpt",
        "fileName": "test",
        "writeMode": "truncate",
        "compress": "SNAPPY",
        "encoding": "UTF-8",
        "hadoopConfig": {
            "fs.oss.accessKeyId": "the-access-id",
            "fs.oss.accessKeySecret": "the-access-key",
            "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com"
            },
            "parquetSchema": "message test {\n    required int64 id;\n    optional binary name (UTF8);\n    optional int64 gmt_create;\n    required group map_col (MAP) {\n        repeated group key_value {\n            required binary key (UTF8);\n            required binary value (UTF8);\n        }\n    }\n    required group array_col (LIST) {\n        repeated group list {\n            required binary element (UTF8);\n        }\n    }\n    required group struct_col {\n        required int64 id;\n        required binary name (UTF8);\n    }    \n}",
            "dataxParquetMode": "fields"
            }
        }
    ```
columns
haveKerberos 是否有Kerberos认证,默认为false。如果您配置为true,则配置项kerberosKeytabFilePathkerberosPrincipal为必填。 false
kerberosKeytabFilePath Kerberos认证keytab文件的绝对路径。 如果haveKerberostrue,则必选。
kerberosPrincipal Kerberos认证Principal名,如****/hadoopclient@**.*** 。如果haveKerberostrue,则必选。
由于Kerberos需要配置keytab认证文件的绝对路径,您需要在自定义资源组上使用此功能。配置示例如下。
"haveKerberos":true,
"kerberosKeytabFilePath":"/opt/datax/**.keytab",
"kerberosPrincipal":"**/hadoopclient@**.**"

向导开发介绍

暂不支持向导开发模式开发。

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

脚本配置示例如下,详情请参见上述参数说明。
{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "hdfs",//插件名。
            "parameter": {
                "path": "",//存储到Hadoop HDFS文件系统的路径信息。
                "fileName": "",//HDFS Writer写入时的文件名。
                "compress": "",//HDFS文件压缩类型。
                "datasource": "",//数据源。
                "column": [
                    {
                        "name": "col1",//字段名。
                        "type": "string"//字段类型。
                    },
                    {
                        "name": "col2",
                        "type": "int"
                    },
                    {
                        "name": "col3",
                        "type": "double"
                    },
                    {
                        "name": "col4",
                        "type": "boolean"
                    },
                    {
                        "name": "col5",
                        "type": "date"
                    }
                ],
                "writeMode": "",//写入模式。
                "fieldDelimiter": ",",//列分隔符。
                "encoding": "",//编码格式。
                "fileType": "text"//文本类型。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "concurrent": 3,//作业并发数。
            "throttle": false,//false代表不限流,下面的限流的速度不生效;true代表限流。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}