表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。您可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗,将清洗后的数据写回到表格存储表中,并能实时访问原始和结果数据。
样例场景
如果写入的为日志数据,日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到另一张数据表中。
字段名称 |
类型 |
含义 |
id |
整型 |
日志id。 |
level |
整型 |
日志的等级,数值越大表示日志等级越高。 |
message |
字符串 |
日志的内容。 |
创建函数和触发器
使用触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。由于触发器只能绑定已有函数,所以需在和表格存储实例相同的地域中创建函数。
- 创建开通了Stream流的数据表。
- 在表格存储控制台创建实例。
- 在该实例下创建数据表,并且开启数据表的Stream功能。
- 创建函数。
- 登录函数计算控制台。
- 在左侧导航栏,单击服务 / 函数。
- 在服务 / 函数页面,单击新建函数。
- 在新建函数页面,选择事件函数,单击下一步。
- 根据实际需要配置函数信息,单击完成。
- 函数名称为etl_test,选择python2.7环境。
- 函数入口为etl_test.handler。

- 配置服务。
- 在左侧导航栏,单击服务 / 函数。
- 在服务 / 函数页面,单击服务配置。
- 在服务配置页签,可配置服务的角色,详情请参见函数计算权限模型。
由于函数计算需要将运行中的日志写入到日志服务中,同时需要对表格存储的表进行读写,所以需要对函数计算进行授权,此处以添加AliyunOTSFullAccess与AliyunLogFullAccess权限为例介绍,如下图所示。实际使用时建议根据权限最小原则来添加权限。

- 修改函数代码。
- 在服务 / 函数页面,单击函数名称。
- 在函数详情页面,单击代码执行。
- 在代码执行页签,编辑代码并保存。
其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)、ENDPOINT(服务地址)需要根据情况进行修改。
使用示例代码如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots
INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'
def _utf8(input):
return str(bytearray(input, "utf-8"))
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
#由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
def get_ots_client(context):
creds = context.credentials
client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
return client
def save_to_ots(client, record):
id = int(get_pk_value(record, 'id'))
level = int(get_attrbute_value(record, 'level'))
msg = get_attrbute_value(record, 'message')
pk = [(_utf8('id'), id),]
attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
row = ots.Row(pk, attr)
client.put_row(RESULT_TABLENAME, row)
def handler(event, context):
records = cbor.loads(event)
#records = json.loads(event)
client = get_ots_client(context)
for record in records['Records']:
level = int(get_attrbute_value(record, 'level'))
if level > 1:
save_to_ots(client, record)
else:
print "Level <= 1, ignore."
- 创建和测试Tablestore触发器。
说明 首次通过表格存储控制台使用函数计算时,请使用老版控制台授予表格存储发送事件通知的权限。
- 在表格存储控制台数据表的触发器管理页签,单击使用已有函数计算。
- 在创建触发器对话框,选择函数计算服务和函数计算函数,输入触发器名称。
说明 当使用老版控制台首次创建触发器时,需要选中
授予表格存储发送事件通知的权限,授予表格存储发送事件通知的权限。
授权后,可以在RAM控制台查看到自动创建的授权角色AliyunTableStoreStreamNotificationRole。
- 单击确定。
运行验证
创建函数和触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。
向source_data表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据,详请请参见
控制台读写数据。
- 当向soure_data表中写入level>1的数据时,数据会同步到result表中。
- 当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。
在文档使用中是否遇到以下问题
更多建议
匿名提交