本文为您介绍MaxCompute表的基本操作。包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何将表转换为DataFrame对象。

基本操作

表是MaxCompute的数据存储单元,详情请参见。表基本操作如下:
  • 列出项目空间下的所有表:入口对象的list_tables()方法可以列出项目空间下的所有表。
    # 列出每个表。
    for table in o.list_tables():
  • 判断表是否存在:入口对象的exist_table()方法可以判断表是否存在。
  • 获取表:入口对象的get_table()方法可以获取表。
    t = o.get_table('dual')
    t.schema
    odps.Schema {
      c_int_a                 bigint
      c_int_b                 bigint
      c_double_a              double
      c_double_b              double
      c_string_a              string
      c_string_b              string
      c_bool_a                boolean
      c_bool_b                boolean
      c_datetime_a            datetime
      c_datetime_b            datetime
    }
    t.lifecycle
    -1
    print(t.creation_time)
    2014-05-15 14:58:43
    t.is_virtual_view
    False
    t.size
    1408
    t.comment
    'Dual Table Comment'
    t.schema.columns
    [<column c_int_a, type bigint>,
     <column c_int_b, type bigint>,
     <column c_double_a, type double>,
     <column c_double_b, type double>,
     <column c_string_a, type string>,
     <column c_string_b, type string>,
     <column c_bool_a, type boolean>,
     <column c_bool_b, type boolean>,
     <column c_datetime_a, type datetime>,
     <column c_datetime_b, type datetime>]
    t.schema['c_int_a']
    <column c_int_a, type bigint>
    t.schema['c_int_a'].comment
    'Comment of column c_int_a'
    您可以通过project参数,跨项目获取表。
    t = o.get_table('dual', project='other_project')

创建表的Schema

初始化方法有如下两种:
  • 通过表的列以及可选的分区进行初始化。
    from odps.models import Schema, Column, Partition
    columns = [Column(name='num', type='bigint', comment='the column'),
               Column(name='num2', type='double', comment='the column2')]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)
    schema.columns
    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]
    schema.partitions
    [<partition pt, type string>]
    schema.names  # 获取非分区字段的字段名。
    ['num', 'num2']
    schema.types  # 获取非分区字段的字段类型。
    [bigint, double]
  • 使用Schema.from_lists()方法。该方法更容易调用,但无法直接设置列和分区的注释。
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    schema.columns
    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

使用create_table()方法创建表的方式有如下两种:
  • 使用表Schema创建表,方法如下。
    table = o.create_table('my_new_table', schema)
    table = o.create_table('my_new_table', schema, if_not_exists=True)  # 只有不存在表时,才创建表。
    table = o.create_table('my_new_table', schema, lifecycle=7)  # 设置生命周期。
  • 采用字段名及字段类型字符串创建表,方法如下。
    # 创建非分区表。
    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    # 创建分区表可传入(表字段列表,分区字段列表)。
    table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
    未打开新数据类型开关时,创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要支持TINYINT和STRUCT等新数据类型,可以打开options.sql.use_odps2_extension = True开关,示例如下。
    from odps import options
    options.sql.use_odps2_extension = True
    table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

同步表更新

当一个表被其他程序更新,例如改变了Schema,可以调用reload()方法同步表的更新。

table.reload()

向表中插入一行记录

Record表示表的一行记录,对表对象调用new_record()方法即可创建一个新的Record。

t = o.get_table('mytable')
r = t.new_record(['val0', 'val1'])  # 值的个数必须等于表Schema的字段数。
r2 = t.new_record()     # 可以不传入值。
r2[0] = 'val0' # 通过偏移设置值。
r2['field1'] = 'val1'  # 通过字段名设置值。
r2.field1 = 'val1'  # 通过属性设置值。

print(record[0])  # 取第0个位置的值。
print(record['c_double_a'])  # 通过字段取值。
print(record.c_double_a)  # 通过属性取值。
print(record[0: 3])  # 切片操作。
print(record[0, 2, 3])  # 取多个位置的值。
print(record['c_int_a', 'c_double_a'])  # 通过多个字段取值。

写入数据

  • 使用入口对象的write_table()方法写入数据。
    records = [[111, 'aaa', True],                 # 此处可以是list。
              [222, 'bbb', False],
              [333, 'ccc', True],
              [444, '中文', False]]
    o.write_table('test_table', records, partition='pt=test', create_partition=True)
    说明
    • 每次调用write_table()方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此,建议您在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。
    • 调用write_table()方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用table.truncate()方法;对于分区表,需要删除分区后再建立新的分区。
  • 对表对象调用open_writer()方法写入数据。
    with t.open_writer(partition='pt=test') as writer:
    records = [[111, 'aaa', True],                 #此处可以是List。
              [222, 'bbb', False],
              [333, 'ccc', True],
              [444, '中文', False]]
    writer.write(records)  #这里Records可以是可迭代对象。
    
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer: #多级分区写法。
    
    records = [t.new_record([111, 'aaa', True]),   #也可以是Record对象。
               t.new_record([222, 'bbb', False]),
               t.new_record([333, 'ccc', True]),
               t.new_record([444, '中文', False])]
    writer.write(records) 
    如果分区不存在,可以使用create_partition参数指定创建分区,如下所示。
    with t.open_writer(partition='pt=test', create_partition=True) as writer:
         records = [[111, 'aaa', True],                 # 这里可以是List。
                    [222, 'bbb', False],
                    [333, 'ccc', True],
                    [444, '中文', False]]
       writer.write(records)  # 此处的Records可以是可迭代对象。
  • 使用多进程并行写数据。
    每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。
    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    
    def write_records(session_id, block_id):
        # 使用指定的ID创建Session。
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # 创建Writer时指定Block_ID。
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # 生成数据并写入对应Block。
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # 每个进程使用同一个Session_ID。
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # 最后执行Commit,并指定所有Block。
        upload_session.commit(block_ids)

获取表数据

获取表数据的方法有多种,常用方法如下:
  • 使用入口对象的read_table()方法。
    for record in o.read_table('test_table', partition='pt=test'):
    # 处理一条记录。
  • 如果您仅需要查看每个表最开始的小于1万条数据,可以对表对象调用head()方法。
    t = o.get_table('dual')
    # 处理每个Record对象。
    for record in t.head(3):
  • 调用open_reader()方法读取数据。
    • 使用with表达式的写法如下。
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
      # 处理一条记录。
    • 不使用with表达式的写法如下。
      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
      # 处理一条记录。

删除表

使用delete_table()方法删除已经存在的表。
o.delete_table('my_table_name', if_exists=True)  # 只有表存在时,才删除表。
 t.drop()  # Table对象存在时,直接调用Drop方法删除。

创建DataFrame

PyODPS提供了DataFrame框架,支持以更方便的方式查询和操作MaxCompute数据。使用to_df()方法,即可转化为DataFrame对象。

table = o.get_table('my_table_name')
df = table.to_df()

表分区

  • 判断是否为分区表。
    if table.schema.partitions:
     print('Table %s is partitioned.' % table.name)
  • 遍历表全部分区。
    for partition in table.partitions:
         print(partition.name)
    for partition in table.iterate_partitions(spec='pt=test'):
    # 遍历二级分区。
  • 判断分区是否存在。
    table.exist_partition('pt=test,sub=2015')
  • 获取分区。
    partition = table.get_partition('pt=test')
     print(partition.creation_time)
    2015-11-18 22:22:27
     partition.size
    0
  • 创建分区。
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists参数,分区不存在时才创建分区。
  • 删除分区。
    t.delete_partition('pt=test', if_exists=True)  # 指定if_exists参数,分区存在时才删除分区。
    partition.drop()  # 分区对象存在时,直接对分区对象调用Drop方法删除。

数据上传下载通道

Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。

  • 上传数据示例
    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    upload_session.commit([0])
  • 下载数据示例
    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
    # 处理每条记录。
说明
  • PyODPS不支持上传外部表,例如OSS和OTS的表。
  • 不推荐直接使用Tunnel接口,推荐您直接使用表对象的写和读接口。
  • 如果您安装了CPython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。