本文为您介绍PySpark开发示例。

如果要访问MaxCompute表,则需要编译datasource包,详细步骤请参见搭建开发环境

SparkSQL应用示例(Spark1.6)

详细代码
from pyspark import SparkContext, SparkConf
from pyspark.sql import OdpsContext
if __name__ == '__main__':
    conf = SparkConf().setAppName("odps_pyspark")
    sc = SparkContext(conf=conf)
    sql_context = OdpsContext(sc)
    sql_context.sql("DROP TABLE IF EXISTS spark_sql_test_table")
    sql_context.sql("CREATE TABLE spark_sql_test_table(name STRING, num BIGINT)")
    sql_context.sql("INSERT INTO TABLE spark_sql_test_table SELECT 'abc', 100000")
    sql_context.sql("SELECT * FROM spark_sql_test_table").show()
    sql_context.sql("SELECT COUNT(*) FROM spark_sql_test_table").show()
提交运行
./bin/spark-submit \
--jars cupid/odps-spark-datasource_xxx.jar \
example.py

SparkSQL应用示例(Spark2.3)

详细代码
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession.builder.appName("spark sql").getOrCreate()
    spark.sql("DROP TABLE IF EXISTS spark_sql_test_table")
    spark.sql("CREATE TABLE spark_sql_test_table(name STRING, num BIGINT)")
    spark.sql("INSERT INTO spark_sql_test_table SELECT 'abc', 100000")
    spark.sql("SELECT * FROM spark_sql_test_table").show()
    spark.sql("SELECT COUNT(*) FROM spark_sql_test_table").show()
提交运行
  • Cluster模式提交运行
    spark-submit --master yarn-cluster \
    --jars cupid/odps-spark-datasource_xxx.jar \
    example.py
  • Local模式运行
    cd $SPARK_HOME
    ./bin/spark-submit --master local[4] \
    --driver-class-path cupid/odps-spark-datasource_xxx.jar \
    /path/to/odps-spark-examples/spark-examples/src/main/python/spark_sql.py
    说明
    • Local模式访问表依赖Tunnel。
    • Local模式要用--driver-class-path而非--jars

Package依赖

由于MaxCompute集群无法自由安装Python库,PySpark依赖其它Python库、插件、项目时,通常需要在本地打包后通过Spark-submit上传。对于特定依赖,打包环境需与线上环境保持一致。打包方式如下,请根据业务的复杂度进行选择:
  • 不打包直接采用公共资源
    • 默认提供Python 2.7.13环境配置
      spark.hadoop.odps.cupid.resources = public.python-2.7.13-ucs4.tar.gz
      spark.pyspark.python = ./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
      第三方库列表如下。
      $./bin/pip list
      Package                       Version
      ----------------------------- -----------
      absl-py                       0.11.0
      aenum                         2.2.4
      asn1crypto                    0.23.0
      astor                         0.8.1
      astroid                       1.6.1
      atomicwrites                  1.4.0
      attrs                         20.3.0
      backports.functools-lru-cache 1.6.1
      backports.lzma                0.0.14
      backports.weakref             1.0.post1
      beautifulsoup4                4.9.3
      bleach                        2.1.2
      boto                          2.49.0
      boto3                         1.9.147
      botocore                      1.12.147
      bz2file                       0.98
      cachetools                    3.1.1
      category-encoders             2.2.2
      certifi                       2019.9.11
      cffi                          1.11.2
      chardet                       3.0.4
      click                         6.7
      click-plugins                 1.1.1
      cligj                         0.7.0
      cloudpickle                   0.5.3
      configparser                  4.0.2
      contextlib2                   0.6.0.post1
      cryptography                  2.6.1
      cssutils                      1.0.2
      cycler                        0.10.0
      Cython                        0.29.5
      dask                          0.18.1
      DBUtils                       1.2
      decorator                     4.2.1
      docutils                      0.16
      entrypoints                   0.2.3
      enum34                        1.1.10
      fake-useragent                0.1.11
      Fiona                         1.8.17
      funcsigs                      1.0.2
      functools32                   3.2.3.post2
      future                        0.16.0
      futures                       3.3.0
      gast                          0.2.2
      gensim                        3.8.3
      geopandas                     0.6.3
      getpass3                      1.2
      google-auth                   1.23.0
      google-auth-oauthlib          0.4.1
      google-pasta                  0.2.0
      grpcio                        1.33.2
      h5py                          2.7.0
      happybase                     1.1.0
      html5lib                      1.0.1
      idna                          2.10
      imbalanced-learn              0.4.3
      imblearn                      0.0
      importlib-metadata            2.0.0
      ipaddress                     1.0.23
      ipython-genutils              0.2.0
      isort                         4.3.4
      itchat                        1.3.10
      itsdangerous                  0.24
      jedi                          0.11.1
      jieba                         0.42.1
      Jinja2                        2.10
      jmespath                      0.10.0
      jsonschema                    2.6.0
      kafka-python                  1.4.6
      kazoo                         2.5.0
      Keras-Applications            1.0.8
      Keras-Preprocessing           1.1.2
      kiwisolver                    1.1.0
      lazy-object-proxy             1.3.1
      libarchive-c                  2.8
      lightgbm                      2.3.1
      lml                           0.0.2
      lxml                          4.2.1
      MarkupSafe                    1.0
      matplotlib                    2.2.5
      mccabe                        0.6.1
      missingno                     0.4.2
      mistune                       0.8.3
      mock                          2.0.0
      more-itertools                5.0.0
      munch                         2.5.0
      nbconvert                     5.3.1
      nbformat                      4.4.0
      networkx                      2.1
      nose                          1.3.7
      numpy                         1.16.1
      oauthlib                      3.1.0
      opt-einsum                    2.3.2
      packaging                     20.4
      pandas                        0.24.2
      pandocfilters                 1.4.2
      parso                         0.1.1
      pathlib2                      2.3.5
      patsy                         0.5.1
      pbr                           3.1.1
      pexpect                       4.4.0
      phpserialize                  1.3
      pickleshare                   0.7.4
      Pillow                        6.2.0
      pip                           20.2.4
      pluggy                        0.13.1
      ply                           3.11
      prompt-toolkit                2.0.1
      protobuf                      3.6.1
      psutil                        5.4.3
      psycopg2                      2.8.6
      ptyprocess                    0.5.2
      py                            1.9.0
      py4j                          0.10.6
      pyasn1                        0.4.8
      pyasn1-modules                0.2.8
      pycosat                       0.6.3
      pycparser                     2.18
      pydot                         1.4.1
      Pygments                      2.2.0
      pykafka                       2.8.0
      pylint                        1.8.2
      pymongo                       3.11.0
      PyMySQL                       0.10.1
      pynliner                      0.8.0
      pyodps                        0.9.3.1
      pyOpenSSL                     17.5.0
      pyparsing                     2.2.0
      pypng                         0.0.20
      pyproj                        2.2.2
      PyQRCode                      1.2.1
      pytest                        4.6.11
      python-dateutil               2.8.1
      pytz                          2020.4
      PyWavelets                    0.5.2
      PyYAML                        3.12
      redis                         3.2.1
      requests                      2.25.0
      requests-oauthlib             1.3.0
      rope                          0.10.7
      rsa                           4.5
      ruamel.ordereddict            0.4.15
      ruamel.yaml                   0.11.14
      s3transfer                    0.2.0
      scandir                       1.10.0
      scikit-image                  0.14.0
      scikit-learn                  0.20.3
      scipy                         1.2.3
      seaborn                       0.9.1
      Send2Trash                    1.5.0
      setuptools                    41.0.0
      Shapely                       1.7.1
      simplegeneric                 0.8.1
      singledispatch                3.4.0.3
      six                           1.15.0
      sklearn2                      0.0.13
      smart-open                    1.8.1
      soupsieve                     1.9.6
      SQLAlchemy                    1.3.20
      statsmodels                   0.11.0
      subprocess32                  3.5.4
      tabulate                      0.8.7
      tensorflow                    2.0.0
      tensorflow-estimator          2.0.1
      termcolor                     1.1.0
      testpath                      0.3.1
      thriftpy                      0.3.9
      timeout-decorator             0.4.1
      toolz                         0.9.0
      tqdm                          4.32.2
      traitlets                     4.3.2
      urllib3                       1.24.3
      wcwidth                       0.2.5
      webencodings                  0.5.1
      Werkzeug                      1.0.1
      wheel                         0.35.1
      wrapt                         1.11.1
      xgboost                       0.82
      xlrd                          1.2.0
      XlsxWriter                    1.0.7
      zipp                          1.2.0
    • 默认提供Python 3.7.9环境配置
      spark.hadoop.odps.cupid.resources = public.python-3.7.9-ucs4.tar.gz
      spark.pyspark.python = ./public.python-3.7.9-ucs4.tar.gz/python-3.7.9-ucs4/bin/python3
      第三方库列表如下。
      Package                       Version
      ----------------------------- -----------
      appnope                       0.1.0
      asn1crypto                    0.23.0
      astroid                       1.6.1
      attrs                         20.3.0
      autopep8                      1.3.4
      backcall                      0.2.0
      backports.functools-lru-cache 1.5
      backports.weakref             1.0rc1
      beautifulsoup4                4.6.0
      bidict                        0.17.3
      bleach                        2.1.2
      boto                          2.49.0
      boto3                         1.9.147
      botocore                      1.12.147
      bs4                           0.0.1
      bz2file                       0.98
      cached-property               1.5.2
      cachetools                    3.1.1
      category-encoders             2.2.2
      certifi                       2019.11.28
      cffi                          1.11.2
      chardet                       3.0.4
      click                         6.7
      click-plugins                 1.1.1
      cligj                         0.7.0
      cloudpickle                   0.5.3
      cryptography                  2.6.1
      cssutils                      1.0.2
      cycler                        0.10.0
      Cython                        0.29.21
      dask                          0.18.1
      DBUtils                       1.2
      decorator                     4.2.1
      docutils                      0.16
      entrypoints                   0.2.3
      fake-useragent                0.1.11
      Fiona                         1.8.17
      future                        0.16.0
      gensim                        3.8.3
      geopandas                     0.8.0
      getpass3                      1.2
      h5py                          3.1.0
      happybase                     1.1.0
      html5lib                      1.0.1
      idna                          2.10
      imbalanced-learn              0.4.3
      imblearn                      0.0
      importlib-metadata            2.0.0
      iniconfig                     1.1.1
      ipykernel                     5.3.4
      ipython                       7.19.0
      ipython-genutils              0.2.0
      isort                         4.3.4
      itchat                        1.3.10
      itsdangerous                  0.24
      jedi                          0.11.1
      jieba                         0.42.1
      Jinja2                        2.10
      jmespath                      0.10.0
      jsonschema                    2.6.0
      jupyter-client                6.1.7
      jupyter-core                  4.6.3
      kafka-python                  1.4.6
      kazoo                         2.5.0
      kiwisolver                    1.3.1
      lazy-object-proxy             1.3.1
      libarchive-c                  2.8
      lightgbm                      2.3.1
      lml                           0.0.2
      lxml                          4.2.1
      Mako                          1.0.10
      MarkupSafe                    1.0
      matplotlib                    3.3.3
      mccabe                        0.6.1
      missingno                     0.4.2
      mistune                       0.8.3
      mock                          2.0.0
      munch                         2.5.0
      nbconvert                     5.3.1
      nbformat                      4.4.0
      networkx                      2.1
      nose                          1.3.7
      numpy                         1.19.4
      packaging                     20.4
      pandas                        1.1.4
      pandocfilters                 1.4.2
      parso                         0.1.1
      patsy                         0.5.1
      pbr                           3.1.1
      pexpect                       4.4.0
      phpserialize                  1.3
      pickleshare                   0.7.4
      Pillow                        6.2.0
      pip                           20.2.4
      plotly                        4.12.0
      pluggy                        0.13.1
      ply                           3.11
      prompt-toolkit                2.0.1
      protobuf                      3.6.1
      psutil                        5.4.3
      psycopg2                      2.8.6
      ptyprocess                    0.5.2
      py                            1.9.0
      py4j                          0.10.6
      pycodestyle                   2.3.1
      pycosat                       0.6.3
      pycparser                     2.18
      pydot                         1.4.1
      Pygments                      2.2.0
      pykafka                       2.8.0
      pylint                        1.8.2
      pymongo                       3.11.0
      PyMySQL                       0.10.1
      pynliner                      0.8.0
      pyodps                        0.9.3.1
      pyOpenSSL                     17.5.0
      pyparsing                     2.2.0
      pypng                         0.0.20
      pyproj                        3.0.0.post1
      PyQRCode                      1.2.1
      pytest                        6.1.2
      python-dateutil               2.8.1
      pytz                          2020.4
      PyWavelets                    0.5.2
      PyYAML                        3.12
      pyzmq                         17.0.0
      qtconsole                     4.3.1
      redis                         3.2.1
      requests                      2.25.0
      retrying                      1.3.3
      rope                          0.10.7
      ruamel.yaml                   0.16.12
      ruamel.yaml.clib              0.2.2
      s3transfer                    0.2.0
      scikit-image                  0.14.0
      scikit-learn                  0.20.3
      scipy                         1.5.4
      seaborn                       0.11.0
      Send2Trash                    1.5.0
      setuptools                    41.0.0
      Shapely                       1.7.1
      simplegeneric                 0.8.1
      six                           1.15.0
      sklearn2                      0.0.13
      smart-open                    1.8.1
      SQLAlchemy                    1.3.20
      statsmodels                   0.12.1
      tabulate                      0.8.7
      testpath                      0.3.1
      thriftpy                      0.3.9
      timeout-decorator             0.4.1
      toml                          0.10.2
      toolz                         0.9.0
      tornado                       6.1
      tqdm                          4.32.2
      traitlets                     4.3.2
      urllib3                       1.24.3
      wcwidth                       0.2.5
      webencodings                  0.5.1
      wheel                         0.35.1
      wrapt                         1.11.1
      xgboost                       1.2.1
      xlrd                          1.2.0
      XlsxWriter                    1.0.7
      zipp                          3.4.0
  • 上传单个WHEEL包
    如果依赖较为简单,则可以只上传单个WHEEL包,通常需要选用manylinux版本。使用方式如下:
    1. 将WHEEL包重命名为ZIP包,例如将pymysql的WHEEL包重命名为pymysql.zip。
    2. 将重命名后的ZIP包上传,文件类型为ARCHIVE。
    3. 在DataWorks Spark节点引用,文件类型为ARCHIVE。
    4. 在代码中修改环境变量后即可导入。
      sys.path.append('pymysql')
      import pymysql
  • 利用脚本一键打包
    若需要的额外依赖较多,上传单个WHEEL包会导致重复操作量倍增。您可以下载脚本,只需提供一个编辑好的requirements文件,就能够直接生成完整的Python环境用于PySpark使用,具体如下。
    • 使用
      $ chmod +x generate_env_pyspark.sh
      $ generate_env_pyspark.sh -h
      Usage:
      generate_env_pyspark.sh [-p] [-r] [-t] [-c] [-h]
      Description:
      -p ARG, the version of python, currently supports python 2.7, 3.5, 3.6 and 3.7 versions.
      -r ARG, the local path of your python requirements.
      -t ARG, the output directory of the gz compressed package.
      -c, clean mode, we will only package python according to your requirements, without other pre-provided dependencies.
      -h, display help of this script.
    • 示例
      # 带有预装依赖的打包方式
      $ generate_env_pyspark.sh -p 3.7 -r your_path_to_requirements -t your_output_directory
      
      # 不带预装依赖的打包方式(clean mode)
      generate_env_pyspark.sh -p 3.7 -r your_path_to_requirements -t your_output_directory -c
    • 说明
      • 脚本适用于Mac或Linux环境,需要预先安装Docker,安装指导请参见Docker帮助文档
      • 目前仅支持python 2.7、3.5、3.6和3.7版本,如果对Python版本不敏感,推荐使用Python 3.7。
      • -c选项表示是否开启clean mode。clean mode无法使用预装依赖,但输出的Python包更小。各版本的依赖请参见Python 2.7预装依赖Python 3.5预装依赖Python 3.6预装依赖Python 3.7预装依赖
      • 当前MaxCompute对上传资源的大小有500 MB的限制,因此如果大部分预装依赖用不到,推荐使用-c选项打包。
    • Spark中使用
      generate_env_pyspark.sh脚本的输出为在指定目录下(-t选项)生成指定Python版本(-p选项)的GZ包。以Python3.7为例,将生成py37.tar.gz,后续再将此包上传为ARCHIVE资源。您可以通过MaxCompute客户端上传,也可以使用odps-sdk上传。各种资源操作,请参见资源操作。以MaxCompute客户端为例,使用方式如下。
      1. 在MaxCompute客户端中执行如下命令添加资源。
        add archive /your/path/to/py37.tar.gz -f;
      2. 在Spark配置中增加如下两个参数。
        spark.hadoop.odps.cupid.resources = your_project.py37.tar.gz
        spark.pyspark.python = your_project.py37.tar.gz/bin/python
        若上述两个参数不生效,还需在Spark作业中增加如下两项配置。例如使用zeppelin调试Pyspark时,notebook中的Python环境配置。
        spark.yarn.appMasterEnv.PYTHONPATH = ./your_project.py37.tar.gz/bin/python
        spark.executorEnv.PYTHONPATH = ./your_project.py37.tar.gz/bin/python
  • 利用Docker容器打包Python环境
    该方式适用于如下场景:
    • 需要引入的依赖包含so文件时,无法通过上述ZIP文件的方式使用,无法进行pip install安装。
    • 对除2.7、3.5、3.6、3.7以外的Python版本有特殊需求。
    针对以上特殊情况,同时保证打包环境与线上环境一致(Mac打出来的Python环境与线上环境不兼容)。以Python3.7为例,基于Docker的打包步骤如下。
    1. 在安装Docker环境的宿主机新建一个Dockerfile文件。
      • Python 3示例参考如下。
        FROM centos:7.6.1810
        RUN set -ex \
            # 预安装所需组件
            && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
            && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \
            && tar -zxvf Python-3.7.0.tgz \
            && cd Python-3.7.0 \
            && ./configure prefix=/usr/local/python3 \
            && make \
            && make install \
            && make clean \
            && rm -rf /Python-3.7.0* \
            && yum install -y epel-release \
            && yum install -y python-pip
        # 设置默认为python3
        RUN set -ex \
            # 备份旧版本python
            && mv /usr/bin/python /usr/bin/python27 \
            && mv /usr/bin/pip /usr/bin/pip-python27 \
            # 配置默认为python3
            && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \
            && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
        # 修复因修改python版本导致yum失效问题
        RUN set -ex \
            && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
            && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
            && yum install -y deltarpm
        # 更新pip版本
        RUN pip install --upgrade pip
      • Python 3示例参考如下。
        FROM centos:7.6.1810
        RUN set -ex \
            # 预安装所需组件
            && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
            && wget https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz \
            && tar -zxvf Python-2.7.18.tgz \
            && cd Python-2.7.18 \
            && ./configure prefix=/usr/local/python2 \
            && make \
            && make install \
            && make clean \
            && rm -rf /Python-2.7.18*
        # 设置默认为python
        RUN set -ex \
            && mv /usr/bin/python /usr/bin/python27 \
            && ln -s /usr/local/python2/bin/python /usr/bin/python
        RUN set -ex \
            && wget https://bootstrap.pypa.io/get-pip.py \
            && python get-pip.py
        RUN set -ex \
            && rm -rf /usr/bin/pip \
            && ln -s /usr/local/python2/bin/pip /usr/bin/pip
        # 修复因修改python版本导致yum失效问题
        RUN set -ex \
            && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
            && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
            && yum install -y deltarpm
        # 更新pip版本
        RUN pip install --upgrade pip
                                                    
    2. 构建镜像并运行容器。
      # 在Dockerfile文件的目录下运行如下命令。
      docker build -t python-centos:3.7 
      docker run -itd --name python3.7 python-centos:3.7
    3. 进入容器安装所需的Python依赖库。
      docker attach python3.7
      pip install [所需依赖库]
    4. 打包Python环境。
      cd /usr/local/
      zip -r python3.7.zip python3/
    5. 拷贝容器中的Python环境到宿主机。
      # 退出容器
      ctrl+P+Q
      # 在宿主机运行命令。
      docker cp python3.7:/usr/local/python3.7.zip 
    6. 上传Python3.7.zip包为MaxCompute资源。DataWorks最大只能上传50 MB的包,如果大于50 MB可以通过MaxCompute客户端上传,文件类型为ARCHIVE。上传资源操作,请参见添加资源
      add archive /path/to/python3.7.zip -f;
    7. 提交作业时只需要在spark-default.conf或DataWorks配置项中添加以下配置即可。
      spark.hadoop.odps.cupid.resources=[project名称].python3.7.zip
      spark.pyspark.python=./[project名].python3.7.zip/python3/bin/python3.7
      说明 通过Docker容器打包,如果遇到so包找不到的情况,则需要手动将so包放到Python环境中。一般so包在容器中都能找到,并在Spark作业中添加以下环境变量。
      spark.executorEnv.LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./[project名].python3.7.zip/python3/[创建的so包目录]
      spark.yarn.appMasterEnv.LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./[project名].python3.7.zip/python3/[创建的so包目录]
  • 引用用户自定义的Python包
    通常情况下,用户需要使用自定义的Python文件,可以打包提交,这样避免了上传多个PY文件,步骤如下:
    1. 将用户代码打包为ZIP包,需要在目录下自定义个一个空白的__init__.py。
    2. 将用户代码ZIP包以MaxCompute资源形式上传,并重命名,该资源在工作目录中将会被解压。
    3. 配置参数spark.executorEnv.PYTHONPATH=
    4. 完成上述步骤,主Python文件就可以导入该目录下的Python文件。