DataWorks提供PyODPS节点类型,集成了MaxCompute的Python SDK。您可以在DataWorks的PyODPS节点上,直接编辑Python代码,用于操作MaxCompute。

MaxCompute提供了Python SDK方法说明,您可以使用Python的SDK来操作MaxCompute。
说明
  • PyODPS节点底层的Python版本为2.7。
  • 推荐通过SQL或者Dataframe的方式处理数据,详情请参见DataFrame概述。不建议您直接调用pandas等第三方包来处理数据。
  • PyODPS节点获取到本地处理的数据不能超过50MB,节点运行时占用的内存不能超过1G,否则节点任务会结束运行。请避免在PyODPS节点中写入过多的数据处理代码。
  • Hints参数的详情请参见SET操作

PyODPS节点主要针对MaxCompute的Python SDK应用。对于纯Python代码的执行,您可以使用Shell节点执行上传至DataWorks的Python脚本。

PyODPS操作实践请参见使用MaxCompute分析IP来源最佳实践PyODPS节点实现结巴中文分词,更多信息请参见PyODPS文档

新建PyODPS节点

  1. 登录DataWorks控制台,单击相应工作空间后的进入数据开发
  2. 鼠标悬停至新建,单击MaxCompute > PyODPS

    您也可以打开相应的业务流程,右键单击MaxCompute,选择新建 > PyODPS

  3. 新建节点对话框中,输入节点名称,并选择目标文件夹,单击提交
    说明 节点名称的长度不能超过128个字符。
  4. 进入PyODPS编辑页面进行编辑。
    您可以在PyODPS节点的编辑页面进行保存提交等操作,详情请参见PyODPS节点界面功能点
    1. ODPS入口。
      DataWorks的PyODPS节点中,将会包含一个全局的变量odpso,即ODPS入口,您无需手动定义ODPS入口。
      print(odps.exist_table('PyODPS_iris'))
    2. 执行SQL。

      PyODPS支持ODPS SQL的查询,并可以读取执行的结果。execute_sql或run_sql方法的返回值是运行实例。

      并非所有在MaxCompute客户端中可以执行的命令,都是PyODPS支持的SQL语句。调用非DDL或非DML语句时,请使用其它方法。

      例如,执行GRANT、REVOKE等语句时,请使用run_security_query方法。PAI命令请使用run_xflowexecute_xflow方法。
      o.execute_sql('select * from dual')  #  同步的方式执行,会阻塞直到SQL执行完成。
      instance = o.run_sql('select * from dual')  # 异步的方式执行。
      print(instance.get_logview_address())  # 获取logview地址。
      instance.wait_for_success()  # 阻塞直到完成。
    3. 设置运行参数。
      您可以通过设置hints参数,来设置运行时的参数,参数类型是dict
      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
      对全局配置设置sql.settings后,每次运行时,都需要添加相关的运行时的参数。
      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # 根据全局配置添加hints。
    4. 读取SQL执行结果。
      运行SQL的instance能够直接执行open_reader的操作,有以下两种情况:
      • SQL返回了结构化的数据。
        with o.execute_sql('select * from dual').open_reader() as reader:
        for record in reader:  # 处理每一个record。
      • 可能执行的是desc等SQL语句,通过reader.raw属性,获取到原始的SQL执行结果。
        with o.execute_sql('desc dual').open_reader() as reader:
        print(reader.raw)
        说明 如果使用了自定义调度参数,页面上直接触发运行PyODPS节点时,需要写死时间,PyODPS节点无法直接替换。
  5. 节点调度配置。
    单击节点编辑区域右侧的调度配置,即可进入节点调度配置页面,详情请参见调度配置模块。
    1. PyODPS节点使用调度参数时,如果使用系统定义的调度参数,可以直接在页面赋值获取。 独享
      说明 由于默认资源组无法直接访问外网环境,建议您有公网访问需求时,使用自定义资源组或独享调度资源。仅专业版DataWorks提供自定义资源组,任意版本均可购买独享调度资源,详情请参见DataWorks独享资源
    2. 赋值完成后,提交节点并进入运维中心页面,进行测试运行,即可查看赋值结果。测试
    3. 您可以在调度配置 > 基础属性中,配置自定义参数。基础属性
      说明 自定义参数需要使用args['参数名']的形式调用,例如print (args['ds'])
  6. 提交节点任务。

    完成调度配置后,单击左上角的保存,提交(提交并解锁)到开发环境。

  7. 发布节点任务。

    具体操作请参见发布管理

  8. 在生产环境测试。

    具体操作请参见周期任务

PyODPS节点预装模块列表

PyODPS节点包括以下预装模块:
  • setuptools
  • cython
  • psutil
  • pytz
  • dateutil
  • requests
  • pyDes
  • numpy
  • pandas
  • scipy
  • scikit_learn
  • greenlet
  • six
  • 其它Python 2.7内置已安装的模块,如smtplib等。