本文为您介绍如何在DataWorks上使用PyODPS。

功能入口

您可以进入DataWorks的数据开发页面创建PyODPS节点。

PyODPS节点分为PyODPS 2和PyODPS 3两种,二者的区别在于底层Python语言版本不同。PyODPS 2底层Python语言版本为Python 2,PyODPS 3底层Python语言版本为Python 3,请您根据实际使用的Python语言版本创建PyODPS节点。

更多创建PyODPS节点信息,请参见创建PyODPS 2节点创建PyODPS 3节点

使用示例,请参见PyODPS节点实现结巴中文分词

新建节点

使用限制

  • PyODPS节点获取本地处理的数据不能超过50 MB,节点运行时占用内存不能超过1 GB,否则节点任务会被系统中止。请避免在PyODPS任务中写额外的Python数据处理代码。
  • 在DataWorks上编写代码并进行调试效率较低,为提升运行效率,建议本地安装IDE进行代码开发。
  • 在DataWorks上使用PyODPS时,为了防止对DataWorks的Gate Way造成压力,对内存和CPU都有限制,该限制由DataWorks统一管理。如果您发现有Got killed报错,即表明内存使用超限,进程被中止。因此,请尽量避免本地的数据操作。通过PyODPS发起的SQL和DataFrame任务(除to_pandas外)不受此限制。
  • 由于缺少matplotlib等包,如下功能可能受限:
    • DataFrame的plot函数。
    • DataFrame自定义函数需要提交到MaxCompute执行。由于Python沙箱限制,第三方库只支持所有的纯粹Python库以及Numpy,因此不能直接使用Pandas。
    • DataWorks中执行的非自定义函数代码可以使用平台预装的Numpy和Pandas。不支持其他带有二进制代码的三方包。
  • 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默认设置为False。如果需要全局开启instance tunnel,需要手动将该值设置为True。
  • 由于实现的原因,Python的atexit包不被支持,请使用try-finally结构实现相关功能。

ODPS入口

DataWorks的PyODPS节点中,将会包含一个全局变量odps或者o,即为ODPS入口。您不需要手动定义ODPS入口。
print(o.exist_table('pyodps_iris'))

执行SQL

您可以在PyODPS节点中执行SQL,详情请参见SQL

Dataworks上默认未开启instance tunnel,即instance.open_reader默认使用Result接口(最多一万条记录)。您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要关闭limit限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # 关闭limit限制,读取全部数据。

with instance.open_reader() as reader:
    # 通过Instance Tunnel可读取全部数据。

您也可以通过在open_reader上添加tunnel=True,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False,实现仅对本次关闭limit限制。

with instance.open_reader(tunnel=True, limit=False) as reader:
# 本次open_reader使用Instance Tunnel接口,且能读取全部数据。
说明 若您未开启Instance Tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK

DataFrame

  • 执行

    在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法

    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepal_width < 3].execute():  # 调用立即执行的方法,处理每条Record。

    如果您需要在Print时调用立即执行,需要开启options.interactive

    from odps import options
    from odps.df import DataFrame
    options.interactive = True  # 在开始处打开开关。
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepal_width.sum())  # Print时会立即执行。
  • 打印详细信息

    通过设置options.verbose选项。在DataWorks上,默认已经处于打开状态,运行过程会打印Logview等详细过程。

获取调度参数

与DataWorks中的SQL节点不同,为了避免影响代码,PyODPS节点不会在代码中替换 ${param_name}这样的字符串,而是在执行代码前,在全局变量中增加一个名为args的dict,调度参数可以在此获取。例如,在节点基本属性 > 参数中设置ds=${yyyymmdd},则可以通过以下方式在代码中获取该参数。

print('ds=' + args['ds'])
ds=20161116
说明 如果您需要获取名为ds=${yyyymmdd}的分区,则可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])