本文为您介绍公共云环境下,使用实时计算Open API方式操作实时计算作业所需的POM依赖和Open API DEMO。

POM依赖

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-foas</artifactId>
    <version>2.2.0</version>
</dependency>
说明 若需要依赖aliyun-java-sdk-core,请使用4.3.0及以上版本。4.3.0版本依赖示例如下。
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>4.3.0</version>
</dependency>

OpenAPI DEMO

package sample;

import com.aliyuncs.AcsRequest;
import com.aliyuncs.AcsResponse;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.foas.model.v20181111.*;
import com.aliyuncs.http.FormatType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.google.gson.Gson;

import java.util.HashMap;
import java.util.Map;

/**
 * 适用于公共云环境
 *
 * 作业的一般流程是:
 * 如果有依赖package,先创建package。
 * 创建Job->获取planjson->更新planjson到job->上线job->启动job->停止instance。
 * Instance的状态一般如下(流作业为例,括号里的为动作):
 * Job上线->UNKNOWN[启动job]->WAITING[等待]->RUNNING[暂停]->PAUSED[恢复]->RUNNING[停止]->TERMINATED
 *
 * Note:以下示例仅供演示foas接口的调用方式,不保证逻辑上的严谨性,请按照自己的场景重新组织调用逻辑。
 */
public class PublicSample {

    //填写购买的foas所在的regionId,根据您的实际情况修改。
    private static final String regionId = "cn-shanghai";

    //云账号AK,支持子账号,需要自行保证该账号具有访问实时计算服务的权限(子账号需要得到主账号访问实时计算服务的权限授权)。
    private static final String accessId = "<yourAccessId>";
    private static final String accessKey = "<yourAccessSecret>";

    private static final String projectName = "<yourProjectName>";
    private static final String jobName = "<yourJobName>";

    // 文件夹与linux文件系统格式相同,以 / 开始,表示根目录,只支持绝对路径,不支持相对路径。
    private static final String folderName = "/新手任务";
    private static final String clusterId = "<yourClusterId>";//可以通过listProjectBindQueue接口来获得。
    private static final String queueName = "root.<queueName>";//可以通过listProjectBindQueue接口来获得。
    private static final String engineVersion = "current";//Blink或者Flink的版本,current表示使用推荐版本,beta表示使用测试版本。
    private static final String packageName = "<packageName>";

    private static final String sql =
            "--SQL\n" +
                    "--********************************************************************--\n" +
                    "--CreateTime: 2018-12-29 14:35:04\n" +
                    "--Comment: 请输入业务注释信息\n" +
                    "--********************************************************************--\n" +
                    "\n" +
                    "CREATE TABLE datahub_source (`name` INT, num VARCHAR) WITH (\n" +
                    "    type = 'random'\n" +
                    ");\n" +
                    "CREATE TABLE datahub_sink (`name` INT, cnt BIGINT) WITH (\n" +
                    "    type = 'PRINT'\n" +
                    ");\n" +
                    "INSERT INTO\n" +
                    "    datahub_sink\n" +
                    "SELECT\n" +
                    "    `name`,\n" +
                    "    COUNT (1)\n" +
                    "FROM\n" +
                    "    datahub_source\n" +
                    "GROUP BY\n" +
                    "    `name`";

    private static final String properties =
            "#checkpoint模式:EXACTLY_ONCE或者AT_LEAST_ONCE\n" +
                    "blink.checkpoint.mode=EXACTLY_ONCE\n" +
                    "#checkpoint间隔时间,单位毫秒\n" +
                    "blink.checkpoint.interval.ms=180000\n" +
                    "#rocksdb的数据生命周期,单位毫秒\n" +
                    "#state.backend.rocksdb.ttl.ms=129600000\n";

    private static <T extends AcsResponse> T getResponse(IAcsClient client, AcsRequest<T> request) {
        AcsResponse response = null;
        try {
            //必须设置为json。
            request.setHttpContentType(FormatType.JSON);
            request.setAcceptFormat(FormatType.JSON);

            response = client.getAcsResponse(request);
        } catch (Exception e) {
            //处理异常的逻辑,请注意异常中的requestId字段,排查问题时,需要提供该值给服务端。
            if (e instanceof ServerException) {
                ServerException serverException = (ServerException) e;
                System.out.println(serverException.getRequestId());//本次请求的requestId。
                System.out.println(serverException.getErrCode());//本次请求失败的的错误码,用于排查错误。
            }
            System.out.println(e.getMessage());
        }
        return (T) response;
    }

    private static String getClusterMetricJson(String clusterId) {
        return String.format(
                        "{\n" +
                        "\t\"queries\": [ {\n" +
                        "\t\t\"metric\": \"%s.system.cpu.user\",\n" +
                        "\t\t\"aggregator\": \"avg\",\n" +
                        "\t\t\"granularity\": \"20s\",\n" +
                        "\t\t\"downsample\": \"avg\"\n" +
                        "\t}, {\n" +
                        "\t\t\"metric\": \"%s.system.cpu.iowait\",\n" +
                        "\t\t\"aggregator\": \"avg\",\n" +
                        "\t\t\"granularity\": \"20s\",\n" +
                        "\t\t\"downsample\": \"avg\"\n" +
                        "\t}],\n" +
                        "\t\"start\": %s,\n" +
                        "\t\"end\": %s,\n" +
                        "\t\"limit\": \"avg:sample:50\"\n" +
                        "}",
                clusterId, clusterId, System.currentTimeMillis() - 3600 * 1000, System.currentTimeMillis()
        );
    }

    public static void main(String[] args) throws Exception {

        IClientProfile profile = DefaultProfile.getProfile(regionId, accessId, accessKey);
        DefaultAcsClient client = new DefaultAcsClient(profile);
        client.setAutoRetry(false);//不进行自动重试。

        //创建一个独享集群,该接口是异步接口,调用返回后需要调用listCluster接口来获取集群的状态来判断是否创建完成。
        CreateClusterRequest createClusterRequest = new CreateClusterRequest();
        createClusterRequest.setDescription("test cluster");
        createClusterRequest.setDisplayName("aaaa");
        createClusterRequest.setOrderId("yyyyy");//填写购买独享集群下单后生成的订单ID。
        createClusterRequest.setUserOssBucket("myBucket");
        createClusterRequest.setUserVpcId("xxxxx");
        createClusterRequest.setUserVSwitch("yyyy");
        createClusterRequest.setZoneId("cn-shanghai-f");
        CreateClusterResponse createClusterResponse = PublicSample.getResponse(client, createClusterRequest);
        System.out.println(new Gson().toJson(createClusterResponse));//该response中含有clusterId信息,这个是集群的索引,后面的大部分接口都需要使用到。

        //根据条件搜索集群,不设置的条件留null。
        ListClusterRequest listClusterRequest = new ListClusterRequest();
        listClusterRequest.setClusterId(clusterId);
        listClusterRequest.setRegion(regionId);
        ListClusterResponse listClusterResponse = PublicSample.getResponse(client, listClusterRequest);
        System.out.println(new Gson().toJson(listClusterResponse));

        //在该集群上创建queue,只能创建一级queue。
        CreateQueueRequest createQueueRequest = new CreateQueueRequest();
        createQueueRequest.setClusterId(clusterId);
        createQueueRequest.setQueueName(queueName);
        createQueueRequest.setMaxVcore(300);
        createQueueRequest.setMaxMemMB(4 * 1024);
        CreateQueueResponse createQueueResponse = PublicSample.getResponse(client, createQueueRequest);
        System.out.println(new Gson().toJson(createQueueResponse));

        //获取集群的引擎版本信息。
        GetClusterEngineVersionsRequest getClusterEngineVersionsRequest = new GetClusterEngineVersionsRequest();
        getClusterEngineVersionsRequest.setClusterId(clusterId);
        GetClusterEngineVersionsResponse getClusterEngineVersionsResponse = PublicSample.getResponse(client, getClusterEngineVersionsRequest);
        System.out.println(new Gson().toJson(getClusterEngineVersionsResponse));

        //获取该集群的资源信息。
        GetClusterResourceRequest getClusterResourceRequest = new GetClusterResourceRequest();
        getClusterResourceRequest.setClusterId(clusterId);
        GetClusterResourceResponse getClusterResourceResponse = PublicSample.getResponse(client, getClusterResourceRequest);
        System.out.println(new Gson().toJson(getClusterResourceResponse));

        //获取集群的queue信息,包括所有的queue和各个queue的资源信息。
        GetClusterQueueInfoRequest getClusterQueueInfoRequest = new GetClusterQueueInfoRequest();
        getClusterQueueInfoRequest.setClusterId(clusterId);
        GetClusterQueueInfoResponse getClusterQueueInfoResponse = PublicSample.getResponse(client, getClusterQueueInfoRequest);
        System.out.println(new Gson().toJson(getClusterQueueInfoResponse));

        //获取集群的detail信息。
        GetClusterDetailsRequest getClusterDetailsRequest = new GetClusterDetailsRequest();
        getClusterDetailsRequest.setClusterId(clusterId);
        GetClusterDetailsResponse getClusterDetailsResponse = PublicSample.getResponse(client, getClusterDetailsRequest);
        System.out.println(new Gson().toJson(getClusterDetailsResponse));

        //获取集群的metrics曲线,最多只能获取最近两小时的数据。
        GetClusterMetricsRequest getClusterMetricsRequest = new GetClusterMetricsRequest();
        getClusterMetricsRequest.setClusterId(clusterId);
        getClusterMetricsRequest.setMetricJson(getClusterMetricJson(clusterId));//指标的名称请查看文档。
        GetClusterMetricsResponse getClusterMetricsResponse = PublicSample.getResponse(client, getClusterMetricsRequest);
        System.out.println(new Gson().toJson(getClusterMetricsResponse));

        //注意:以下操作独享集群的接口只支持按量付费的独享集群。
        //扩容集群,该接口是异步接口,需要通过集群状态来判断是否操作完成。
        ExpandClusterRequest expandClusterRequest = new ExpandClusterRequest();
        expandClusterRequest.setClusterId(clusterId);
        expandClusterRequest.setModel("Ecs_4c8g");//该参数可选,当前不支持混布,只能填与原有的机型一致的机型,或者不填写。
        expandClusterRequest.setCount(20);//目标集群台数。
        ExpandClusterResponse expandClusterResponse = PublicSample.getResponse(client, expandClusterRequest);
        System.out.println(new Gson().toJson(expandClusterResponse));

        //修改集群master的机型配置,只能更改master机型,不能修改master数量。
        //注意:该接口是异步接口,需要通过集群状态来判断是否操作完成。
        ModifyMasterSpecRequest modifyMasterSpecRequest = new ModifyMasterSpecRequest();
        modifyMasterSpecRequest.setClusterId(clusterId);
        modifyMasterSpecRequest.setMasterTargetModel("Ecs_8c16g");
        ModifyMasterSpecResponse modifyMasterSpecResponse = PublicSample.getResponse(client, modifyMasterSpecRequest);
        System.out.println(new Gson().toJson(modifyMasterSpecResponse));

        //缩容集群,缩容有两种参数提供形式(当前仅支持第一种形式。):
        //1.直接填需要去除的实例instanceId(可以通过getClusterDetails接口获得)。
        //2.提供每种机型的目标台数。
        //注意:该接口是异步接口,需要通过集群状态来判断是否操作完成。
        ShrinkClusterRequest shrinkClusterRequest = new ShrinkClusterRequest();
        shrinkClusterRequest.setClusterId(clusterId);
        shrinkClusterRequest.setInstanceIds("111,222,333");
        //shrinkClusterRequest.setModelTargetCount("{"Ecs_4c8g":2,"Ecs_8c16g":7}");
        ShrinkClusterResponse shrinkClusterResponse = PublicSample.getResponse(client, shrinkClusterRequest);
        System.out.println(new Gson().toJson(shrinkClusterResponse));

        //销毁集群,只能销毁按量付费的集群。注意: 该接口操作风险高,请务必谨慎操作。
        DestroyClusterRequest destroyClusterRequest = new DestroyClusterRequest();
        destroyClusterRequest.setClusterId(clusterId);
        DestroyClusterResponse destroyClusterResponse = PublicSample.getResponse(client, destroyClusterRequest);
        System.out.println(new Gson().toJson(destroyClusterResponse));

        //创建project,该接口分两种场景,
        //1.购买的共享集群,需要提供订单ID。
        //2.独享集群上创建project,需要提供独享集群的clusterId。
        CreateProjectRequest createProjectRequest = new CreateProjectRequest();
        //createProjectRequest.setOrderId();//在共享集群上创建project时必填,填写购买的订单id。
        createProjectRequest.setDescription("test project");
        createProjectRequest.setDeployType("cell");//在共享集群上创建project时填public。
        createProjectRequest.setClusterId(clusterId);//在共享集群上创建project时可不填。
        createProjectRequest.setName(projectName);
        CreateProjectResponse createProjectResponse = PublicSample.getResponse(client, createProjectRequest);
        System.out.println(new Gson().toJson(createProjectResponse));

        // 绑定queue到project,该操作类似于将资源(某个queue)指定给某个project使用,只有将资源绑定到了某个project上,
        // 该project里的作业才能使用该资源。当前只支持一对一的关系,即一个project只能绑定一个queue,一个queue只能被绑定至一个project上。
        BindQueueRequest bindQueueRequest = new BindQueueRequest();
        bindQueueRequest.setProjectName(projectName);
        bindQueueRequest.setClusterId(clusterId);
        bindQueueRequest.setQueueName(queueName);
        BindQueueResponse bindQueueResponse = PublicSample.getResponse(client, bindQueueRequest);
        System.out.println(new Gson().toJson(bindQueueResponse));

        //获取指定project已绑定的资源。
        ListProjectBindQueueRequest listProjectBindQueueRequest = new ListProjectBindQueueRequest();
        listProjectBindQueueRequest.setProjectName(projectName);
        ListProjectBindQueueResponse listProjectBindQueueResponse = PublicSample.getResponse(client, listProjectBindQueueRequest);
        System.out.println(new Gson().toJson(listProjectBindQueueResponse));

        //获取project详情。
        GetProjectRequest getProjectRequest = new GetProjectRequest();
        getProjectRequest.setProjectName(projectName);
        GetProjectResponse getProjectResponse = PublicSample.getResponse(client, getProjectRequest);
        System.out.println(new Gson().toJson(getProjectResponse));

        //根据条件搜索project。
        ListProjectRequest listProjectRequest = new ListProjectRequest();
        listProjectRequest.setName(projectName);
        listProjectRequest.setDeployType("cell");
        listProjectRequest.setRegion("cn-shanghai");
        ListProjectResponse listProjectResponse = PublicSample.getResponse(client, listProjectRequest);
        System.out.println(new Gson().toJson(listProjectResponse));

        //从project解绑资源。注意:解绑前,请先将使用了该资源的作业全部下线。
        UnbindQueueRequest unbindQueueRequest = new UnbindQueueRequest();
        unbindQueueRequest.setProjectName(projectName);
        unbindQueueRequest.setClusterId(clusterId);
        unbindQueueRequest.setQueueName(queueName);
        UnbindQueueResponse unbindQueueResponse = PublicSample.getResponse(client, unbindQueueRequest);
        System.out.println(new Gson().toJson(unbindQueueResponse));

        //删除project。注意:删除project是高危操作,请务必谨慎操作。删除的条件是该project下的所有作业均已下线。
        DeleteProjectRequest deleteProjectRequest = new DeleteProjectRequest();
        deleteProjectRequest.setProjectName(projectName);
        DeleteProjectResponse deleteProjectResponse = PublicSample.getResponse(client, deleteProjectRequest);
        System.out.println(new Gson().toJson(deleteProjectResponse));

        //获取文件夹列表。
        ListChildFolderRequest listChildFolderRequest = new ListChildFolderRequest();
        listChildFolderRequest.setPath("/");
        listChildFolderRequest.setProjectName(projectName);
        ListChildFolderResponse listChildFolderResponse = PublicSample.getResponse(client, listChildFolderRequest);
        System.out.println(new Gson().toJson(listChildFolderResponse));

        //获取文件夹。
        GetFolderRequest getFolderRequest = new GetFolderRequest();
        getFolderRequest.setPath(folderName);
        getFolderRequest.setProjectName(projectName);
        GetFolderResponse getFolderResponse = PublicSample.getResponse(client, getFolderRequest);
        System.out.println(new Gson().toJson(getFolderResponse));

        Long folderId;
        GetFolderResponse.Folder folder = getFolderResponse.getFolder();

        if (folder == null || folder.getFolderId() == null) {
            //创建文件夹。
            CreateFolderRequest createFolderRequest = new CreateFolderRequest();
            createFolderRequest.setPath(folderName);
            createFolderRequest.setProjectName(projectName);
            CreateFolderResponse createFolderResponse = PublicSample.getResponse(client, createFolderRequest);
            System.out.println(new Gson().toJson(createFolderResponse));
            folderId = createFolderResponse.getFolderId();
        } else {
            folderId = folder.getFolderId();
        }

        // sdk的package存在用户自己的oss上,需要将package通过oss console或者oss sdk上传到oss上。
        // 此处需要将oss meta信息提供给foas,
        // 并对foas授权bucket读取权限(ram role:AliyunStreamDefaultRole),foas在使用时会从用户指定的oss和bucket上下载package。
        // 请确保oss的region与foas所在的region一致。
        // 如果project建在独享集群集群上,则不需要提供endpoint,bucket等信息,只需要提供package的ossPath。
        // 用户的oss信息在创建独享集群的时候已经提供过了,不需要重复提供。
        CreatePackageRequest createPackageRequest = new CreatePackageRequest();
        createPackageRequest.setProjectName(projectName);
        createPackageRequest.setPackageName(packageName);
        createPackageRequest.setType("jar");
        createPackageRequest.setDescription("test package");
        createPackageRequest.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");//提供的endpoint需要保证网络可通达。
        createPackageRequest.setOssBucket("your bucket");
        createPackageRequest.setOssOwner("111111111");//oss的主账号uid。
        createPackageRequest.setOssPath("aaa/bbb/ccc.jar");
        createPackageRequest.setMd5("123456");//可不填。不填,则跳过md5校验;填写,则在下载时进行完整性校验。
        CreatePackageResponse createPackageResponse = PublicSample.getResponse(client, createPackageRequest);
        System.out.println(new Gson().toJson(createPackageResponse));

        //更新package。
        UpdatePackageRequest updatePackageRequest = new UpdatePackageRequest();
        updatePackageRequest.setProjectName(projectName);
        updatePackageRequest.setPackageName(packageName);
        updatePackageRequest.setOssPath("aaa/bbb/ddd.jar");
        UpdatePackageResponse updatePackageResponse = PublicSample.getResponse(client, updatePackageRequest);
        System.out.println(new Gson().toJson(updatePackageResponse));

        //获取package详情。
        GetPackageRequest getPackageRequest = new GetPackageRequest();
        getPackageRequest.setProjectName(projectName);
        getPackageRequest.setPackageName(packageName);
        GetPackageResponse getPackageResponse = PublicSample.getResponse(client, getPackageRequest);
        System.out.println(new Gson().toJson(getPackageResponse));

        //根据条件搜索package。
        ListPackageRequest listPackageRequest = new ListPackageRequest();
        listPackageRequest.setProjectName(projectName);
        listPackageRequest.setPackageName("aa");//模糊匹配
        listPackageRequest.setType("jar");
        listPackageRequest.setPageIndex(1);//可选,默认1
        listPackageRequest.setPageSize(30);//可选,默认10
        ListPackageResponse listPackageResponse = PublicSample.getResponse(client, listPackageRequest);
        System.out.println(new Gson().toJson(listPackageResponse));

        //列举project绑定的cluster和queue。
        ListProjectBindQueueRequest listProjectBindQueueRequest2 = new ListProjectBindQueueRequest();
        listProjectBindQueueRequest2.setProjectName(projectName);
        ListProjectBindQueueResponse listProjectBindQueueResponse2 = PublicSample.getResponse(client, listProjectBindQueueRequest2);
        System.out.println(new Gson().toJson(listProjectBindQueueResponse2));

        //查询project下绑定的queue的资源。
        ListProjectBindQueueResourceRequest listProjectBindQueueResourceRequest = new ListProjectBindQueueResourceRequest();
        listProjectBindQueueResourceRequest.setProjectName(projectName);
        ListProjectBindQueueResourceResponse listProjectBindQueueResourceResponse = PublicSample.getResponse(client, listProjectBindQueueResourceRequest);
        System.out.println(new Gson().toJson(listProjectBindQueueResourceResponse));

        //根据条件搜索project下的作业。
        ListJobRequest listJobRequest = new ListJobRequest();
        listJobRequest.setProjectName(projectName);
        listJobRequest.setJobType("flink_stream");
        listJobRequest.setApiType("sql");
        //该参数控制返回的job信息中是否返回较长的字段(例如code)。填false,则返回的只有部分字段;不填或者填true,则返回所有字段。
        listJobRequest.setIsShowFullField(false);
        ListJobResponse listJobResponse = PublicSample.getResponse(client, listJobRequest);
        System.out.println(new Gson().toJson(listJobResponse));

        //获取作业。
        GetJobRequest getJobRequest = new GetJobRequest();
        getJobRequest.setProjectName(projectName);
        getJobRequest.setJobName(jobName);
        GetJobResponse getJobResponse = PublicSample.getResponse(client, getJobRequest);
        System.out.println(new Gson().toJson(getJobResponse));

        GetJobResponse.Job job = getJobResponse.getJob();
        if (job == null || job.getJobName() == null) {
            //创建作业。
            CreateJobRequest createJobRequest = new CreateJobRequest();
            createJobRequest.setProjectName(projectName);
            createJobRequest.setJobName(jobName);
            createJobRequest.setCode(sql);
            createJobRequest.setProperties(properties);//无,则不用设置。
            //clusterId和queueName可以不配置。不配置,则随机从项目绑定的cluster和queue中选择一个。
            createJobRequest.setClusterId(clusterId);
            createJobRequest.setQueueName(queueName);
            //engineVersion可以不配置。不配置,则使用current版本(推荐版本)。
            createJobRequest.setEngineVersion(engineVersion);
            createJobRequest.setDescription("test");
            createJobRequest.setPackages(packageName);//多个package之间用英文逗号分隔。
            createJobRequest.setJobType("flink_stream");//flink_stream/flink_batch大小写不敏感。
            createJobRequest.setApiType("sql");//datastream/sql大小写不敏感。
            createJobRequest.setFolderId(folderId);

            CreateJobResponse createJobResponse = PublicSample.getResponse(client, createJobRequest);
            System.out.println(new Gson().toJson(createJobResponse));
        } else {
            //修改作业。
            UpdateJobRequest updateJobRequest = new UpdateJobRequest();
            updateJobRequest.setProjectName(projectName);
            updateJobRequest.setJobName(jobName);
            updateJobRequest.setCode(sql);//设置需要修改的字段,不修改的字段不需要设置。

            UpdateJobResponse updateJobResponse = PublicSample.getResponse(client, updateJobRequest);
            System.out.println(new Gson().toJson(updateJobResponse));
        }

        //返回引用指定package的job,该接口查询上线后的job引用关系,未上线的job不会返回。
        GetRefPackageJobRequest getRefPackageJobRequest = new GetRefPackageJobRequest();
        getRefPackageJobRequest.setProjectName(projectName);
        getRefPackageJobRequest.setPackageName(packageName);
        GetRefPackageJobResponse getRefPackageJobResponse = PublicSample.getResponse(client, getRefPackageJobRequest);
        System.out.println(new Gson().toJson(getRefPackageJobResponse));

        //校验job是否存在语法错误。
        ValidateJobRequest validateJobRequest = new ValidateJobRequest();
        validateJobRequest.setProjectName(projectName);
        validateJobRequest.setJobName(jobName);
        ValidateJobResponse validateJobResponse = PublicSample.getResponse(client, validateJobRequest);
        System.out.println(new Gson().toJson(validateJobResponse));

        //获取job的planjson。该接口是异步接口,提交完成之后,需要调用CheckRawPlanJson接口获取结果:Session in run/success/fail。
        GetRawPlanJsonRequest getRawPlanJsonRequest = new GetRawPlanJsonRequest();
        getRawPlanJsonRequest.setProjectName(projectName);
        getRawPlanJsonRequest.setJobName(jobName);
        getRawPlanJsonRequest.setAutoconfEnable(true);//可以不填,默认为false。
        //设置这个job预期使用多少资源来运行,这个参数会影响到生成plan的大小,不提供的话,底层引擎会生成默认资源配置。
        getRawPlanJsonRequest.setExpectedCore(2f);//这个参数与下面的参数可以同时提供,也可以同时不提供,不允许一个提供,一个不提供。
        getRawPlanJsonRequest.setExpectedGB(9f);//
        GetRawPlanJsonResponse getRawPlanJsonResponse = PublicSample.getResponse(client, getRawPlanJsonRequest);

        CheckRawPlanJsonRequest checkRawPlanJsonRequest = new CheckRawPlanJsonRequest();
        checkRawPlanJsonRequest.setProjectName(projectName);
        checkRawPlanJsonRequest.setJobName(jobName);
        checkRawPlanJsonRequest.setSessionId(getRawPlanJsonResponse.getSessionId());

        String planJson;
        CheckRawPlanJsonResponse checkRawPlanJsonResponse;
        while (true) {
            checkRawPlanJsonResponse = PublicSample.getResponse(client, checkRawPlanJsonRequest);
            System.out.println(checkRawPlanJsonResponse.getPlanJsonInfo().getStatus());
            if (checkRawPlanJsonResponse.getPlanJsonInfo().getStatus().equals("success")) {
                planJson = checkRawPlanJsonResponse.getPlanJsonInfo().getPlanJson();
                break;
            } else if (checkRawPlanJsonResponse.getPlanJsonInfo().getStatus().equals("fail")) {
                System.out.println(checkRawPlanJsonResponse.getPlanJsonInfo().getErrorMessage());
                return;
            }
            Thread.sleep(1000);//每秒查询一次。
        }

        //修改作业planjson
        UpdateJobRequest updateJobRequest = new UpdateJobRequest();
        updateJobRequest.setProjectName(projectName);
        updateJobRequest.setJobName(jobName);
        updateJobRequest.setPlanJson(planJson);
        UpdateJobResponse updateJobResponse = PublicSample.getResponse(client, updateJobRequest);
        System.out.println(new Gson().toJson(updateJobResponse));

        //预估当前job所配置的planjson需要的资源数,为上线接口需要提供的maxCU数值提供参考,仅blink-3.2.1及以上版本支持该功能。
        CalcPlanJsonResourceRequest calcPlanJsonResourceRequest = new CalcPlanJsonResourceRequest();
        calcPlanJsonResourceRequest.setProjectName(projectName);
        calcPlanJsonResourceRequest.setJobName(jobName);
        CalcPlanJsonResourceResponse calcPlanJsonResourceResponse = PublicSample.getResponse(client, calcPlanJsonResourceRequest);
        System.out.println(new Gson().toJson(calcPlanJsonResourceResponse));

        //获取作业上一次运行记录的最后一次自动调优的planjson,注意:是上一次的运行记录,包括停止的和暂停的。如果不存在话返回null。
        GetJobLatestAutoScalePlanRequest getJobLatestAutoScalePlanRequest = new GetJobLatestAutoScalePlanRequest();
        getJobLatestAutoScalePlanRequest.setProjectName(projectName);
        getJobLatestAutoScalePlanRequest.setJobName(jobName);
        GetJobLatestAutoScalePlanResponse getJobLatestAutoScalePlanResponse = PublicSample.getResponse(client, getJobLatestAutoScalePlanRequest);
        System.out.println(new Gson().toJson(getJobLatestAutoScalePlanResponse));

        //上线作业。
        CommitJobRequest commitJobRequest = new CommitJobRequest();
        commitJobRequest.setProjectName(projectName);
        commitJobRequest.setJobName(jobName);
        commitJobRequest.setIsOnOff(true);//是否开启autoscale自动调优。不填,表示不开启,仅blink-3.2.1及以上版本支持该功能。
        commitJobRequest.setMaxCU(100);//设置最大资源使用上限。
        commitJobRequest.setConfigure("{\"fetch_delay\":44.5}");//json格式,当前仅支持这一个配置项,大小写敏感,value是double型。
        CommitJobResponse commitJobResponse = PublicSample.getResponse(client, commitJobRequest);
        System.out.println(new Gson().toJson(commitJobResponse));

        //获取实例。
        GetInstanceRequest getInstanceRequest = new GetInstanceRequest();
        getInstanceRequest.setProjectName(projectName);
        getInstanceRequest.setJobName(jobName);

        //-1表示获取流作业的当前运行实例(因为流作业某一时刻最多只会存在一个示例)。注意:批作业需要填实际的运行实例id。
        getInstanceRequest.setInstanceId(-1L);
        GetInstanceResponse getInstanceResponse = PublicSample.getResponse(client, getInstanceRequest);
        System.out.println(new Gson().toJson(getInstanceResponse));

        GetInstanceResponse.Instance instance = getInstanceResponse.getInstance();

        {
            //启动作业。
            if (instance == null || "UNKNOWN".equals(instance.getActualState()) || "TERMINATED".equals(instance.getActualState())) {
                StartJobRequest startJobRequest = new StartJobRequest();
                startJobRequest.setProjectName(projectName);
                startJobRequest.setJobName(jobName);

                Map map = new HashMap<String, String>();
                //startOffset表示启动点位
                map.put("startOffset", String.valueOf(System.currentTimeMillis()));//精确到ms时间戳。
                startJobRequest.setParameterJson(new Gson().toJson(map));//json格式参数。
                StartJobResponse startJobResponse = PublicSample.getResponse(client, startJobRequest);
                System.out.println(new Gson().toJson(startJobResponse));

                //等待启动成功。
                while (true) {
                    GetInstanceRunSummaryRequest getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
                    getInstanceRunSummaryRequest.setProjectName(projectName);
                    getInstanceRunSummaryRequest.setJobName(jobName);
                    getInstanceRunSummaryRequest.setInstanceId(-1L);
                    GetInstanceRunSummaryResponse getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
                    System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

                    if ("WAITING".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                        System.out.println(String.format("lastErrorTime[%s], lastErrorMessage[%s]",
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorTime(),
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorMessage()));
                        Thread.sleep(1000);
                    } else {
                        break;
                    }
                }
            }
        }

        {
            // 作业启动成功后,可以调用如下的接口进行运维。
            //获取instance的运行dag图。
            GetInstanceDetailRequest getInstanceDetailRequest = new GetInstanceDetailRequest();
            getInstanceDetailRequest.setProjectName(projectName);
            getInstanceDetailRequest.setJobName(jobName);
            getInstanceDetailRequest.setInstanceId(-1L);
            GetInstanceDetailResponse getInstanceDetailResponse = PublicSample.getResponse(client, getInstanceDetailRequest);
            System.out.println(new Gson().toJson(getInstanceDetailResponse));

            //获取instance的运行时配置。
            GetInstanceConfigRequest getInstanceConfigRequest = new GetInstanceConfigRequest();
            getInstanceConfigRequest.setProjectName(projectName);
            getInstanceConfigRequest.setJobName(jobName);
            getInstanceConfigRequest.setInstanceId(-1L);
            GetInstanceConfigResponse getInstanceConfigResponse = PublicSample.getResponse(client, getInstanceConfigRequest);
            System.out.println(new Gson().toJson(getInstanceConfigResponse));

            //获取instance的运行时checkpoint信息。
            GetInstanceCheckpointRequest getInstanceCheckpointRequest = new GetInstanceCheckpointRequest();
            getInstanceCheckpointRequest.setProjectName(projectName);
            getInstanceCheckpointRequest.setJobName(jobName);
            getInstanceCheckpointRequest.setInstanceId(-1L);
            GetInstanceCheckpointResponse getInstanceCheckpointResponse = PublicSample.getResponse(client, getInstanceCheckpointRequest);
            System.out.println(new Gson().toJson(getInstanceCheckpointResponse));

            //获取instance的运行时异常信息(failover信息)。
            GetInstanceExceptionsRequest getInstanceExceptionsRequest = new GetInstanceExceptionsRequest();
            getInstanceExceptionsRequest.setProjectName(projectName);
            getInstanceExceptionsRequest.setJobName(jobName);
            getInstanceExceptionsRequest.setInstanceId(-1L);
            GetInstanceExceptionsResponse getInstanceExceptionsResponse = PublicSample.getResponse(client, getInstanceExceptionsRequest);
            System.out.println(new Gson().toJson(getInstanceExceptionsResponse));

            //如果上线时作业开启了自动调优,调优效果不理想,可中途关闭自动调优,关闭之后也可以再次打开。
            UpdateAutoScaleConfigRequest updateAutoScaleConfigRequest = new UpdateAutoScaleConfigRequest();
            updateAutoScaleConfigRequest.setProjectName(projectName);
            updateAutoScaleConfigRequest.setJobName(jobName);
            updateAutoScaleConfigRequest.setInstanceId(-1L);
            updateAutoScaleConfigRequest.setConfigJson("{\"autoscale.enable\":false}");//json格式,当前只支持这个key,大小写敏感,取值为true或者false。
            UpdateAutoScaleConfigResponse updateAutoScaleConfigResponse = PublicSample.getResponse(client, updateAutoScaleConfigRequest);
            System.out.println(new Gson().toJson(updateAutoScaleConfigResponse));

            //查询instance的运行时各项指标的数据曲线信息。
            //当前只支持查询最近两小时的曲线,metric的格式为:blink.{projectName}.{jobName}.{metricName}。
            String metricJson = "{\n" +
                    "\t\"start\": 1547637620000,\n" + //精确到ms时间戳。
                    "\t\"limit\": \"avg:sample:50\",\n" +
                    "\t\"end\": 1547638420000,\n" +
                    "\t\"queries\": [{\n" +
                    "\t\t\"downsample\": \"20s-avg\",\n" +
                    "\t\t\"metric\": \"blink.bayes_team.huayuan_test_job.delay\",\n" +
                    "\t\t\"granularity\": \"20s\",\n" +
                    "\t\t\"aggregator\": \"max\"\n" +
                    "\t}, {\n" +
                    "\t\t\"downsample\": \"20s-avg\",\n" +
                    "\t\t\"metric\": \"blink.bayes_team.huayuan_test_job.fetched_delay\",\n" +
                    "\t\t\"granularity\": \"20s\",\n" +
                    "\t\t\"aggregator\": \"max\"\n" +
                    "\t}]\n" +
                    "}";

            GetInstanceMetricRequest getInstanceMetricRequest = new GetInstanceMetricRequest();
            getInstanceMetricRequest.setProjectName(projectName);
            getInstanceMetricRequest.setJobName(jobName);
            getInstanceMetricRequest.setInstanceId(-1L);
            getInstanceMetricRequest.setMetricJson(metricJson);
            GetInstanceMetricResponse getInstanceMetricResponse = PublicSample.getResponse(client, getInstanceMetricRequest);
            System.out.println(new Gson().toJson(getInstanceMetricResponse.getMetrics()));

            //获取instance实际使用的资源信息(cpu/memory)。
            GetInstanceResourceRequest getInstanceResourceRequest = new GetInstanceResourceRequest();
            getInstanceResourceRequest.setProjectName(projectName);
            getInstanceResourceRequest.setJobName(jobName);
            getInstanceResourceRequest.setInstanceId(-1L);
            GetInstanceResourceResponse getInstanceResourceResponse = PublicSample.getResponse(client, getInstanceResourceRequest);
            System.out.println(new Gson().toJson(getInstanceResourceResponse.getResource()));

            //批量获取多个作业的运行时状态。
            BatchGetInstanceRunSummaryRequest batchGetInstanceRunSummaryRequest = new BatchGetInstanceRunSummaryRequest();
            batchGetInstanceRunSummaryRequest.setProjectName(projectName);
            batchGetInstanceRunSummaryRequest.setJobType("flink_stream");
            batchGetInstanceRunSummaryRequest.setJobNames("job1,job2,job3");//多个job之间用英文逗号分隔,请确保都是stream job,并且job已存在且已上线,否则会忽略错误的job。
            BatchGetInstanceRunSummaryResponse batchGetInstanceRunSummaryResponse = PublicSample.getResponse(client, batchGetInstanceRunSummaryRequest);
            System.out.println(new Gson().toJson(batchGetInstanceRunSummaryResponse.getRunSummarys()));
        }

        {
            //暂停作业。
            GetInstanceRunSummaryRequest getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
            getInstanceRunSummaryRequest.setProjectName(projectName);
            getInstanceRunSummaryRequest.setJobName(jobName);
            getInstanceRunSummaryRequest.setInstanceId(-1L);
            GetInstanceRunSummaryResponse getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
            System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

            if ("RUNNING".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                ModifyInstanceStateRequest modifyInstanceStateRequest = new ModifyInstanceStateRequest();
                modifyInstanceStateRequest.setProjectName(projectName);
                modifyInstanceStateRequest.setJobName(jobName);
                modifyInstanceStateRequest.setInstanceId(-1L);
                modifyInstanceStateRequest.setExpectState("PAUSED");
                ModifyInstanceStateResponse modifyInstanceStateResponse = PublicSample.getResponse(client, modifyInstanceStateRequest);
                System.out.println(new Gson().toJson(modifyInstanceStateResponse));

                //等待暂停成功。
                while (true) {
                    getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
                    getInstanceRunSummaryRequest.setProjectName(projectName);
                    getInstanceRunSummaryRequest.setJobName(jobName);
                    getInstanceRunSummaryRequest.setInstanceId(-1L);
                    getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
                    System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

                    if ("RUNNING".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                        System.out.println(String.format("lastErrorTime[%s], lastErrorMessage[%s]",
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorTime(),
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorMessage()));
                        Thread.sleep(1000);
                    } else {
                        break;
                    }
                }
            }
        }

        {
            //恢复作业。
            GetInstanceRunSummaryRequest getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
            getInstanceRunSummaryRequest.setProjectName(projectName);
            getInstanceRunSummaryRequest.setJobName(jobName);
            getInstanceRunSummaryRequest.setInstanceId(-1L);
            GetInstanceRunSummaryResponse getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
            System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

            if ("PAUSED".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                ModifyInstanceStateRequest modifyInstanceStateRequest = new ModifyInstanceStateRequest();
                modifyInstanceStateRequest.setProjectName(projectName);
                modifyInstanceStateRequest.setJobName(jobName);
                modifyInstanceStateRequest.setInstanceId(-1L);
                modifyInstanceStateRequest.setExpectState("RUNNING");
                ModifyInstanceStateResponse modifyInstanceStateResponse = PublicSample.getResponse(client, modifyInstanceStateRequest);
                System.out.println(new Gson().toJson(modifyInstanceStateResponse));

                //等待恢复成功。
                while (true) {
                    getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
                    getInstanceRunSummaryRequest.setProjectName(projectName);
                    getInstanceRunSummaryRequest.setJobName(jobName);
                    getInstanceRunSummaryRequest.setInstanceId(-1L);
                    getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
                    System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

                    if ("PAUSED".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                        System.out.println(String.format("lastErrorTime[%s], lastErrorMessage[%s]",
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorTime(),
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorMessage()));
                        Thread.sleep(1000);
                    } else {
                        break;
                    }
                }
            }
        }

        {
            //停止作业。
            GetInstanceRunSummaryRequest getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
            getInstanceRunSummaryRequest.setProjectName(projectName);
            getInstanceRunSummaryRequest.setJobName(jobName);
            getInstanceRunSummaryRequest.setInstanceId(-1L);
            GetInstanceRunSummaryResponse getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
            System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

            if ("RUNNING".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                ModifyInstanceStateRequest modifyInstanceStateRequest = new ModifyInstanceStateRequest();
                modifyInstanceStateRequest.setProjectName(projectName);
                modifyInstanceStateRequest.setJobName(jobName);
                modifyInstanceStateRequest.setInstanceId(-1L);
                modifyInstanceStateRequest.setExpectState("TERMINATED");
                ModifyInstanceStateResponse modifyInstanceStateResponse = PublicSample.getResponse(client, modifyInstanceStateRequest);
                System.out.println(new Gson().toJson(modifyInstanceStateResponse));

                //等待停止成功。
                while (true) {
                    getInstanceRunSummaryRequest = new GetInstanceRunSummaryRequest();
                    getInstanceRunSummaryRequest.setProjectName(projectName);
                    getInstanceRunSummaryRequest.setJobName(jobName);
                    getInstanceRunSummaryRequest.setInstanceId(-1L);
                    getInstanceRunSummaryResponse = PublicSample.getResponse(client, getInstanceRunSummaryRequest);
                    System.out.println(new Gson().toJson(getInstanceRunSummaryResponse));

                    if ("RUNNING".equals(getInstanceRunSummaryResponse.getRunSummary().getActualState())) {
                        System.out.println(String.format("lastErrorTime[%s], lastErrorMessage[%s]",
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorTime(),
                                getInstanceRunSummaryResponse.getRunSummary().getLastErrorMessage()));
                        Thread.sleep(1000);
                    } else {
                        break;
                    }
                }
            }
        }

        //下线作业。
        OfflineJobRequest offlineJobRequest = new OfflineJobRequest();
        offlineJobRequest.setProjectName(projectName);
        offlineJobRequest.setJobName(jobName);
        OfflineJobResponse offlineJobResponse = PublicSample.getResponse(client, offlineJobRequest);
        System.out.println(new Gson().toJson(offlineJobResponse));

        //删除作业。
        DeleteJobRequest deleteJobRequest = new DeleteJobRequest();
        deleteJobRequest.setProjectName(projectName);
        deleteJobRequest.setJobName(jobName);
        DeleteJobResponse deleteJobResponse = PublicSample.getResponse(client, deleteJobRequest);
        System.out.println(new Gson().toJson(deleteJobResponse));

        //删除package。
        DeletePackageRequest deletePackageRequest = new DeletePackageRequest();
        deletePackageRequest.setProjectName(projectName);
        deletePackageRequest.setPackageName(packageName);
        DeletePackageResponse deletePackageResponse = PublicSample.getResponse(client, deletePackageRequest);
        System.out.println(new Gson().toJson(deletePackageResponse));
    }
}