介绍如何使用Hive/HadoopMR访问表格存储中的表。

数据准备

在表格存储中准备一张数据表pet,name是唯一的一列主键,数据示例如下。

说明 表中空白部分无需写入,因为表格存储是schema-free的存储结构,没有值也无需写入NULL。
name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30

Hive访问示例

前提条件

按照准备工作准备好Hadoop、Hive、JDK环境以及表格存储Java SDK和EMR SDK依赖包。

示例

# HADOOP_HOME及HADOOP_CLASSPATH可以添加到/etc/profile中。
$ export HADOOP_HOME=${您的Hadoop安装目录}
$ export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
$ bin/hive
hive> CREATE EXTERNAL TABLE pet
  (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  WITH SERDEPROPERTIES(
    "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  TBLPROPERTIES (
    "tablestore.endpoint"="YourEndpoint",
    "tablestore.access_key_id"="YourAccessKeyId",
    "tablestore.access_key_secret"="YourAccessKeySecret",
    "tablestore.table.name"="pet");
hive> SELECT * FROM pet;
Bowser  Diane   dog     m       1979-08-31      1995-07-29
Buffy   Harold  dog     f       1989-05-13      NULL
Chirpy  Gwen    bird    f       1998-09-11      NULL
Claws   Gwen    cat     m       1994-03-17      NULL
Fang    Benny   dog     m       1990-08-27      NULL
Fluffy  Harold  cat     f       1993-02-04      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
hive> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy  Gwen    bird    f       1998-09-11      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 1.41 seconds, Fetched 4 row(s)
            
参数说明如下:
  • WITH SERDEPROPERTIES

    tablestore.columns.mapping(可选):在默认的情况下,外表的字段名即为表格存储上表的列名(主键列名或属性列名)。但有时外表的字段名和表上列名并不一致(比如处理大小写或字符集相关的问题),此时需要指定tablestore.columns.mapping。该参数为一个英文逗号分隔的字符串,每个分隔之间不能添加空格,每一项都是表上列名,顺序与外表字段一致。

    说明 表格存储的列名支持空白字符,所以空白也会被认为是表上列名的一部分。
  • TBLPROPERTIES
    • tablestore.endpoint(必填):访问表格存储的服务地址,可以在表格存储控制台上查看实例的endpoint信息。

    • tablestore.instance(可选):表格存储的实例名。若不填,则为tablestore.endpoint的第一段。

    • tablestore.table.name(必填):表格存储上对应的表名。

    • tablestore.access_key_id、tablestore.access_key_secret(必填):更多信息,请参见访问控制

    • tablestore.sts_token(可选):更多信息,请参见配置用户权限

HadoopMR访问示例

以下示例介绍如何使用HadoopMR程序统计数据表pet的行数。

示例

  • 构建Mappers和Reducers
    public class RowCounter {
    public static class RowCounterMapper
    extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
        private final static Text agg = new Text("TOTAL");
        private final static LongWritable one = new LongWritable(1);
    
        @Override
        public void map(
            PrimaryKeyWritable key, RowWritable value, Context context)
            throws IOException, InterruptedException {
            context.write(agg, one);
        }
    }
    
    public static class IntSumReducer
    extends Reducer<Text,LongWritable,Text,LongWritable> {
    
        @Override
        public void reduce(
            Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }
    }
                        

    数据源每从表格存储上读出一行,都会调用一次mapper的map()。前两个参数PrimaryKeyWritable和RowWritable分别对应这行的主键以及这行的内容。可以通过调用PrimaryKeyWritable.getPrimaryKey()和RowWritable.getRow()取得表格存储Java SDK定义的主键对象及行对象。

  • 配置表格存储作为mapper的数据源。
    private static RangeRowQueryCriteria fetchCriteria() {
        RangeRowQueryCriteria res = new     RangeRowQueryCriteria("YourTableName");
        res.setMaxVersions(1);
        List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
        List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
        lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
        upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
        res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
        res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
        return res;
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "row count");
        job.addFileToClassPath(new Path("hadoop-connector.jar"));
        job.setJarByClass(RowCounter.class);
        job.setMapperClass(RowCounterMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setInputFormatClass(TableStoreInputFormat.class);
        TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/");
        TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
        TableStoreInputFormat.addCriteria(job, fetchCriteria());
        FileOutputFormat.setOutputPath(job, new Path("output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }                    

    示例中使用job.setInputFormatClass(TableStoreInputFormat.class)把表格存储设为数据源,除此之外,还需要:

    • 把hadoop-connector.jar部署到集群上并添加到classpath里面。路径为addFileToClassPath()指定hadoop-connector.jar的本地路径。代码中假定hadoop-connector.jar在当前路径。
    • 访问表格存储需要指定入口和身份。通过TableStoreInputFormat.setEndpoint()和TableStoreInputFormat.setCredential()设置访问表格存储需要指定的Endpoint和Access Key信息。
    • 指定一张表用来计数。

      说明
      • 每调用一次addCriteria()可以在数据源里添加一个Java SDK定义的RangeRowQueryCriteria对象。可以多次调用addCriteria()。RangeRowQueryCriteria对象与表格存储Java SDK GetRange接口所用的RangeRowQueryCriteria对象具有相同的限制条件。
      • 使用RangeRowQueryCriteria的setFilter()和addColumnsToGet()可以在表格存储的服务器端过滤掉不必要的行和列,减少访问数据的大小,降低成本,提高性能。
      • 通过添加对应多张表的多个RangeRowQueryCriteria,可以实现多表的union。
      • 通过添加同一张表的多个RangeRowQueryCriteria,可以做到更均匀的切分。TableStore-Hadoop Connector会根据一些策略将用户传入的范围切细。

程序运行示例

$ HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
...
$ find output -type f
output/_SUCCESS
output/part-r-00000
output/._SUCCESS.crc
output/.part-r-00000.crc
$ cat out/part-r-00000
TOTAL   9           

类型转换说明

表格存储支持的数据类型和Hive/Spark支持的数据类型不完全相同。

下表列出了从表格存储的数据类型(行)转换到Hive/Spark数据类型(列)的支持情况。

类型转换 TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER 支持,损失精度 支持,损失精度 支持,损失精度 支持 支持,损失精度 支持,损失精度 不支持 不支持 不支持
DOUBLE 支持,损失精度 支持,损失精度 支持,损失精度 支持,损失精度 支持,损失精度 支持 不支持 不支持 不支持
BOOLEAN 不支持 不支持 不支持 不支持 不支持 不支持 支持 不支持 不支持
STRING 不支持 不支持 不支持 不支持 不支持 不支持 不支持 支持 不支持
BINARY 不支持 不支持 不支持 不支持 不支持 不支持 不支持 不支持 支持