MaxCompute Python 2 UDF包括UDF、UDAF和UDTF三种函数。本文为您介绍如何通过MaxCompute Python 2实现这三种函数。
使用限制
- 读写本地文件。
- 启动子进程。
- 启动线程。
- 使用Socket通信。
- 其他系统调用。
基于上述原因,您上传的代码都必须通过标准Python实现,禁止C扩展模块。
- 所有标准Python实现(不依赖扩展模块)的模块都可用。
- C实现的扩展模块中下列模块可用:
- array、audioop
- binascii、bisect
- cmath、_codecs_cn、_codecs_hk、_codecs_iso2022、_codecs_jp、_codecs_kr、_codecs_tw、_collections、cStringIO
- datetime
- _functools、future_builtins、
- _heapq、_hashlib
- itertools
- _json
- _locale、_lsprof
- math、_md5、_multibytecodec
- operator
- _random
- _sha256、_sha512、_sha、_struct、strop
- time
- unicodedata
- _weakref
- cPickle
- 部分模块功能受限。例如,沙箱限制了您的代码最多可往标准输出和标准错误输出写入数据的大小为20 KB,即
sys.stdout/sys.stderr
最多能写20 KB,多余的字符会被忽略。
第三方库
参数与返回值
@odps.udf.annotate(signature)
Python UDF支持的MaxCompute SQL数据类型包括BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、复杂数据类型(ARRAY、MAP和STRUCT)和复杂数据类型嵌套。在执行SQL语句前,必须确定所有函数的参数类型和返回值类型,因此对于Python这一动态类型语言,需要通过对UDF类加Decorator的方式指定函数签名。
arg_type_list '->' type_list
arg_type_list: type_list | '*' | '' | 'char(n)' | 'varchar(n)'
type_list: [type_list ','] type
type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime' | 'float' | 'binary' | 'date' | 'decimal' | 'decimal(precision,scale)'
- 箭头左边表示参数类型,右边表示返回值类型。
- 只有UDTF的返回值可以是多列,UDF和UDAF只能返回一列。
- 星号(
*
)代表变长参数。使用变长参数时,UDF、UDTF和UDAF可以匹配任意输入参数。 - 如果项目空间使用2.0数据类型版本,decimal可以设置precision和scale,复杂类型写法请参见2.0数据类型版本。
- 返回值暂不支持CHAR和VARCHAR类型。
'bigint,double->string' # 参数类型为BIGINT、DOUBLE,返回值类型为STRING。
'bigint,boolean->string,datetime' # UDTF参数类型为BIGINT、BOOLEAN,返回值类型为STRING,DATETIME。
'*->string' # 变长参数,输入参数类型任意,返回值类型为STRING。
'->double' # 参数为空,返回值类型为DOUBLE。
'array<bigint>->struct<x:string, y:int>' # 参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。
'->map<bigint, string>' # 参数为空,返回值类型为MAP<BIGINT, STRING>。
查询语义解析阶段会检查不符合函数签名的用法,并返回错误,禁止执行此函数。执行时,UDF函数的参数会以函数签名指定的类型传入。返回值类型也要与函数签名指定的类型一致,否则检查到类型不匹配时也会报错。
MaxCompute SQL Type | Python 2 Type |
---|---|
BIGINT | INT |
STRING | STR |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | INT |
FLOAT | FLOAT |
CHAR | STR |
VARCHAR | STR |
BINARY | BYTEARRAY |
DATE | INT |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
- DATETIME类型对应的Python类型是INT,值为Epoch UTC Time起至今的毫秒数。您可以通过Python标准库中的DATETIME模块处理日期时间类型。
odps.udf.int(value,[silent=True])
增加了参数silent
。当silent
为True时,如果value
无法转为INT,则会返回None(不会返回异常)。- NULL值对应Python的None。
UDF
evaluate
方法,即可实现Python UDF。from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
annotate
指定函数签名。
Python UDF的使用示例请参考使用MaxCompute分析IP来源最佳实践。
UDAF
class odps.udf.BaseUDAF
:继承此类实现Python UDAF。BaseUDAF.new_buffer()
:返回聚合函数的中间值的buffer
。buffer
必须是Marshal对象(例如LIST、DICT),并且buffer
的大小不应该随数据量递增。在极限情况下,buffer
在执行对象序列化后的大小不应该超过2 MB。BaseUDAF.iterate(buffer[, args, ...])
:将args
聚合到中间值buffer
中。BaseUDAF.merge(buffer, pbuffer)
:将两个中间值buffer
聚合到一起,即将pbuffer
合并到buffer
中。BaseUDAF.terminate(buffer)
:将中间值buffer
转换为MaxCompute SQL的基本类型。
@annotate('double->double')
class Average(BaseUDAF):
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
UDTF
class odps.udf.BaseUDTF
:Python UDTF的基类。您可以继承此类实现process
或close
等方法。BaseUDTF.__init__()
:初始化方法。继承类如果需要实现此方法,必须在一开始调用基类的初始化方法super(BaseUDTF, self).__init__()
。init
方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以通过此方法初始化所有状态。BaseUDTF.process([args, ...])
:此方法由MaxCompute SQL框架调用,SQL中每一条记录都会调用一次process
,process
的参数为SQL语句中指定的UDTF输入参数。BaseUDTF.forward([args, ...])
:UDTF的输出方法。此方法由用户代码调用。每调用一次forward
,便会输出一条记录。forward
的参数为SQL语句中指定的UDTF的输出参数。BaseUDTF.close()
:UDTF的结束方法。此方法由MaxCompute SQL框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
#将string按逗号分隔输出成多条记录。
def process(self, arg):
props = arg.split(',')
for p in props:
self.forward(p)
annotate
指定参数类型和返回值类型。这样,在SQL中使用函数时可以匹配任意输入参数,但无法推导返回值类型,所有输出参数都将被视为STRING类型。因此在调用forward
时,必须将所有输出值转换为STRING类型。
引用资源
Python UDF可以通过odps.distcache
模块引用资源文件,支持引用文件资源和表资源。
odps.distcache.get_cache_file(resource_name)
:返回指定名字的资源内容。resource_name
为STRING类型,对应当前项目空间中已存在的资源名。如果资源名非法或者没有相应的资源,则会返回异常。说明 使用UDF访问资源,在创建UDF时需要声明引用的资源,否则会报错。- 返回值为File-like对象。在使用完此对象后,您需要调用
close
方法释放打开的资源文件。
示例如下。@annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定资源表的内容。resource_name
支持STRING类型,对应当前Project中已存在的资源表名。如果资源名非法或者没有相应的资源,会返回异常。- 返回值为GENERATOR类型,调用者通过遍历获取表的内容,每次遍历可得到以数组形式存在的表中的一条记录。
示例如下。from odps.udf import annotate from odps.distcache import get_cache_table @annotate('->string') class DistCacheTableExample(object): def __init__(self): self.records = list(get_cache_table('udf_test')) self.counter = 0 self.ln = len(self.records) def evaluate(self): if self.counter > self.ln - 1: return None ret = self.records[self.counter] self.counter += 1 return str(ret)
UDTF读取MaxCompute资源示例
在Python UDTF中,您可以读取MaxCompute的资源。利用UDTF读取MaxCompute资源的操作流程示例如下:
- 编写Python代码,并保存为py_udtf_example.py文件。代码示例如下:
# -*- coding: utf-8 -*- from odps.udf import annotate from odps.udf import BaseUDTF from odps.distcache import get_cache_file from odps.distcache import get_cache_table @annotate('string -> string, bigint') class UDTFExample(BaseUDTF): """读取资源文件和资源表里的pageid、adid,生成dict """ def __init__(self): import json cache_file = get_cache_file('test_json.txt') self.my_dict = json.load(cache_file) cache_file.close() records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = [record[1]] """输入pageid,输出pageid以及它对应的所有adid """ def process(self, pageid): for adid in self.my_dict[pageid]: self.forward(pageid, adid)
- 准备资源文件test_json.txt、创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据。命令示例如下:
- 资源文件test_json.txt的内容如下:
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
- 创建资源表table_resource1,并插入数据。
create table if not exists table_resource1 (pageid string, adid int); insert into table table_resource1 values("contact_page2",2),("contact_page3",5);
- 创建内部表tmp1,并插入数据。
create table if not exists tmp1 (pageid string); insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
- 资源文件test_json.txt的内容如下:
- 登录MaxCompute客户端将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。
更多添加资源信息,请参见添加资源。命令示例如下:
add py py_udtf_example.py; add file test_json.txt; add table table_resource1 as table_resource1;
- 通过MaxCompute客户端,创建UDTF函数my_udtf。
更多创建函数信息,请参见注册函数。命令示例如下:
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
- 通过MaxCompute客户端,基于新创建的UDTF函数,运行SQL语句。命令示例如下:
- 示例1:单纯使用UDTF函数运行SQL。
返回结果如下。select my_udtf(pageid) as (pageid, adid) from tmp1;
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | +------------+------------+
- 示例2:对示例1中的命令改写,结合Lateral View运行SQL。
返回结果如下。select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | +------------+------------+
执行结果的两列分别为tmp1.pageid和UDTF输出的adid。
- 示例3:结合聚合函数和Lateral View运行SQL。
返回结果如下。--聚合统计每个adid出现的次数。 select adid, count(1) as cnt from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid group by adid;
+------------+------------+ | adid | cnt | +------------+------------+ | 1 | 1 | | 2 | 1 | | 3 | 2 | | 4 | 1 | | 5 | 2 | +------------+------------+
- 示例1:单纯使用UDTF函数运行SQL。
在文档使用中是否遇到以下问题
更多建议
匿名提交