查询加速MCQA(MaxCompute Query Acceleration)是MaxCompute内建的功能,使用原生的MaxCompute SQL语言,并且支持所有的MaxCompute内建函数以及权限系统。本文为您介绍如何使用MCQA功能。

背景信息

MCQA功能支持的接入方式如下:

MaxCompute客户端

您可以通过MaxCompute客户端开启MCQA功能,操作步骤如下:

  1. 下载最新版客户端(odpscmd)。
    说明 客户端需要为0.35.1及以上版本。
  2. 安装并配置客户端,详情请参见安装并配置客户端
  3. 修改客户端安装目录conf下的配置文件odps_config.ini,在配置文件最后一行增加如下命令行。
    enable_interactive_mode=true
  4. 运行客户端安装目录bin下的MaxCompute客户端(Linux系统下运行./bin/odpscmd,Windows下运行./bin/odpscmd.bat)。出现如下信息,表示运行成功。
    客户端运行成功
  5. 执行查询作业验证MCQA功能已开启。
    执行查询作业后,客户端界面返回结果如果包含如下信息表明MCQA功能已开启。验证加速查询

DataWorks临时查询或数据开发

DataWorks的临时查询手动业务流程模块默认开启MCQA功能,您无需手动开启。如果您需要关闭MCQA功能,请提工单处理。

临时查询模块执行查询作业,如果返回结果包含如下信息表明MCQA功能已开启。临时查询详情请参见创建临时查询临时查询
手动业务流程模块执行查询作业,返回结果包含如下信息表明MCQA功能已开启。手动业务流程详情请参见管理手动业务流程手动业务流程

JDBC

JDBC使用场景如下:

配置JDBC启用MCQA功能

使用JDBC连接MaxCompute时,您可以通过执行如下操作开启MCQA功能。使用JDBC连接MaxCompute的操作详情请参见JDBC使用说明

  1. 下载支持MCQA功能的JDBC JAR包或可编译的源代码
  2. 通过Maven方式配置Pom依赖。
    <dependency>
      <groupId>com.aliyun.odps</groupId>
      <artifactId>odps-jdbc</artifactId>
      <version>3.2.0</version>
      <classifier>jar-with-dependencies</classifier>
    </dependency>
    说明 版本需要为3.2.0及以上版本。
  3. 基于源代码创建Java程序,适配实际信息。详情请参见ODPS JDBC
    需要适配的信息如下。
    String accessId = "your_access_id";
    String accessKey = "your_access_key";
    Connection conn = DriverManager.getConnection("jdbc:odps:http://service.odps.aliyun.com/api?project=<your_project_name>"&accessId&accessKey&charset=UTF-8&interactiveMode=true");
    Statement stmt = conn.createStatement();
    --your_access_id、your_access_key为您的云账号信息,your_project_name为需要使用MCQA功能的项目名称。
    Connection conn = DriverManager.getConnection(conn, accessId, accessKey);
    Statement stmt = conn.createStatement();
    String tableName = "testOdpsDriverTable";
    stmt.execute("drop table if exists " + tableName);
    stmt.execute("create table " + tableName + " (key int, value string)");
    在JDBC中启用MCQA功能的方式与常规方式完全一致,只需要在连接串(Connection conn)或代码中(二选一即可)配置如下信息:
    • 连接串(Connection conn):增加interactiveMode=true属性。
    • 代码:增加interactiveMode=true语句。

基于JDBC配置Tableau启用MCQA功能

服务器增加interactiveMode=true属性,用于开启MCQA功能。建议您同步增加enableOdpsLogger=true属性,用于打印日志。配置操作详情请参见配置JDBC使用Tableau

完整的服务器配置示例如下。
http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true
如果只对项目空间中的部分表进行Tableau操作,您可以在服务器参数中增加table_list=table_name1, table_name2属性选择需要的表,表之间用英文逗号(,)分隔。如果表过多,会导致Tableau打开缓慢,强烈建议使用此方式只载入需要的表。示例如下。
http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true&table_list=orders,customers
对于有大量分区的表不建议把所有分区的数据都设置成数据源,可以筛选需要的分区,或通过自定义SQL获取需要的数据。

基于JDBC配置SQLWorkBench启用MCQA功能

完成JDBC驱动配置后,在Profile配置界面修改已填写的JDBC URL,支持SQLWorkBench使用MCQA功能。Profile配置操作详情请参见配置JDBC使用SQL Workbench/J

需要配置的URL格式为jdbc:odps:<MaxCompute_endpoint>?project=<MaxCompute_project_name>&accessId=<AccessKey ID>&accessKey=<AccessKey Secret>&charset=UTF-8&interactiveMode=true,参数说明如下:
  • maxcompute_endpoint:MaxCompute服务所在区域的Endpoint。详情请参见配置Endpoint
  • maxcompute_project_name:MaxCompute项目空间名称。
  • AccessKey ID:有访问指定项目空间权限的AccessKey ID。
  • AccessKey Secret:AccessKey ID对应的AccessKey Secret。
  • charset=UTF-8:字符集编码格式。
  • interactiveMode:MCQA功能开关,ture表示开启MCQA功能。

基于Java SDK启用MCQA功能

Java SDK详情请参见Java SDK介绍。您需要通过Maven配置Pom依赖,配置示例如下。
<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-sdk-core</artifactId>
  <version>0.35.5-public</version>
</dependency>
说明 版本需要为0.35.1及以上版本。
创建Java程序,命令示例如下。
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.sqa.*;

import java.io.IOException;
import java.util.*;

public class SQLExecutorExample {

    public static void SimpleExample() {
        // 设置账号和项目信息。
        Account account = new AliyunAccount("your_access_id", "your_access_key");
        Odps odps = new Odps(account);
        odps.setDefaultProject("your_project_name");
        odps.setEndpoint("http://service.odps.aliyun.com/api");

        // 准备构建SQLExecutor。
        SQLExecutorBuilder builder = SQLExecutorBuilder.builder();

        SQLExecutor sqlExecutor = null;
        try {
            // run in offline mode or run in interactive mode
            if (false) {
                // 创建一个默认执行离线SQL的Executor。
                sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.OFFLINE).build();
            } else {
                // 创建一个默认执行查询加速SQL的Executor。
                sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.INTERACTIVE).build();
            }
            // 如果需要的话可以传入查询的特殊设置。
            Map<String, String> queryHint = new HashMap<>();
            queryHint.put("odps.sql.mapper.split.size", "128");
            // 提交一个查询作业,支持传入Hint。
            sqlExecutor.run("select count(1) from test_table;", queryHint);

            // 列举一些支持的常用获取信息的接口。
            // UUID
            System.out.println("ExecutorId:" + sqlExecutor.getId());
            // 当前查询作业的logview。
            System.out.println("Logview:" + sqlExecutor.getLogView());
            // 当前查询作业的Instance对象(Interactive模式多个查询作业可能为同一个Instance)。
            System.out.println("InstanceId:" + sqlExecutor.getInstance().getId());
            // 当前查询作业的阶段进度(Console的进度条)
            System.out.println("QueryStageProgress:" + sqlExecutor.getProgress());
            // 当前查询作业的执行状态变化日志,例如回退信息。
            System.out.println("QueryExecutionLog:" + sqlExecutor.getExecutionLog());

            // 提供两种获取结果的接口。
            if(false) {
                // 直接获取全部查询作业结果,同步接口,可能会占用本线程直到查询成功或失败。
                List<Record> records = sqlExecutor.getResult();
                printRecords(records);
            } else {
                // 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
                ResultSet resultSet = sqlExecutor.getResultSet();
                while (resultSet.hasNext()) {
                    printRecord(resultSet.next());
                }
            }

            // run another query
            sqlExecutor.run("select * from test_table;", new HashMap<>());
            if(false) {
                // 直接获取全部查询结果,同步接口,可能会占用本线程直到查询成功或失败。
                List<Record> records = sqlExecutor.getResult();
                printRecords(records);
            } else {
                // 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
                ResultSet resultSet = sqlExecutor.getResultSet();
                while (resultSet.hasNext()) {
                    printRecord(resultSet.next());
                }
            }
        } catch (OdpsException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (sqlExecutor != null) {
                // 关闭Executor释放相关资源。
                sqlExecutor.close();
            }
        }
    }

    // SQLExecutor can be reused by pool mode
    public static void ExampleWithPool() {
        // 设置账号和项目信息。
        Account account = new AliyunAccount("your_access_id", "your_access_key");
        Odps odps = new Odps(account);
        odps.setDefaultProject("your_project_name");
        odps.setEndpoint("http://service.odps.aliyun.com/api");

        // 通过连接池方式执行查询。
        SQLExecutorPool sqlExecutorPool = null;
        SQLExecutor sqlExecutor = null;
        try {
            // 准备连接池,设置连接池大小和默认执行模式。
            SQLExecutorPoolBuilder builder = SQLExecutorPoolBuilder.builder();
            builder.odps(odps)
                    .initPoolSize(1) // init pool executor number
                    .maxPoolSize(5)  // max executors in pool
                    .executeMode(ExecuteMode.INTERACTIVE); // run in interactive mode

            sqlExecutorPool = builder.build();
            // 从连接池中获取一个Executor,如果不够将会在Max限制内新增Executor。
            sqlExecutor = sqlExecutorPool.getExecutor();

            // Executor具体用法和上一示例一致。
            sqlExecutor.run("select count(1) from test_table;", new HashMap<>());
            System.out.println("InstanceId:" + sqlExecutor.getId());
            System.out.println("Logview:" + sqlExecutor.getLogView());

            List<Record> records = sqlExecutor.getResult();
            printRecords(records);
        } catch (OdpsException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            sqlExecutor.close();
        }
        sqlExecutorPool.close();
    }

    private static void printRecord(Record record) {
        for (int k = 0; k < record.getColumnCount(); k++) {

            if (k != 0) {
                System.out.print("\t");
            }

            if (record.getColumns()[k].getType().equals(OdpsType.STRING)) {
                System.out.print(record.getString(k));
            } else if (record.getColumns()[k].getType().equals(OdpsType.BIGINT)) {
                System.out.print(record.getBigint(k));
            } else {
                System.out.print(record.get(k));
            }
        }
    }

    private static void printRecords(List<Record> records) {
        for (Record record : records) {
            printRecord(record);
            System.out.println();
        }
    }

    public static void main(String args[]) {
        SimpleExample();
        ExampleWithPool();
    }
}