数据开发者在使用MaxCompute开发过程中,需要统计MaxCompute项目中账号的费用以及作业的耗时情况,助力合理规划和调整作业。本文为您介绍如何通过MaxCompute元数据(Information
Schema)统计TOP费用账号及耗时作业,同时通过钉钉推送到客户群。
背景信息
通常,数据开发者会通过DataWorks标准模式使用MaxCompute,MaxCompute会在Information Schema中记录所有作业的执行账号为同一个主账号,只有小部分的作业执行账号为RAM用户。此时数据开发者会关注如何统计各个账号的费用和耗时作业。MaxCompute提供如下方案解决这两个问题:
- 账号费用:您可以通过账单详情中的用量明细来查询,但是这种方式无法将用量明细归属到对应的RAM用户。Information Schema视图中的TASKS_HISTORY会记录MaxCompute项目内已完成的作业详情,且保留近14天数据。您可以将TASKS_HISTORY中的数据备份到指定MaxCompute项目中,基于该数据统计TOP费用账号。
- 耗时作业:您可以通过TASKS_HISTORY中的数据统计TOP耗时作业。
更多关于Information Schema的功能及使用限制,请参见Information Schema概述。
步骤一:获取Information Schema服务
自2020年12月1日起,对于新创建的MaxCompute项目,MaxCompute默认提供Information Schema相关的元数据视图,您无需手工安装Information
Schema权限包。
对于存量MaxCompute项目,在您开始使用Information Schema服务前,需要以项目空间所有者(Project Owner)或具备Super_Administrato角色的RAM用户身份安装Information
Schema权限包,获得访问项目元数据的权限。安装方式有如下两种:
说明 如果统计多个MaxCompute项目的元数据,您需要分别对各个MaxCompute项目安装Information Schema权限包。然后把各个MaxCompute项目的元数据的备份数据插入到同一个表中做集中统计分析。
(可选)步骤二:对除Project Owner外的用户授权
Information Schema的视图包含了项目级别的所有用户数据,默认项目空间所有者可以查看。如果项目内其他用户或角色需要查看,需要进行授权,请参见MaxCompute Package授权方法。
授权语法如下。
grant actions on package <pkgName> to user <username>;
grant actions on package <pkgName> to role <role_name>;
授权示例如下。
grant read on package Information_Schema.systables to user RAM$name@your_account.com:user01;
步骤三:下载并备份元数据
在MaxCompute项目上创建元数据备份表,并定时将元数据写入备份表中。以MaxCompute客户端为例,操作流程如下:
- 登录MaxCompute客户端,执行如下命令创建元数据备份表。
--project_name为MaxCompute项目名称。
create table if not exists <project_name>.information_history
(
task_catalog STRING
,task_schema STRING
,task_name STRING
,task_type STRING
,inst_id STRING
,`status` STRING
,owner_id STRING
,owner_name STRING
,result STRING
,start_time DATETIME
,end_time DATETIME
,input_records BIGINT
,output_records BIGINT
,input_bytes BIGINT
,output_bytes BIGINT
,input_tables STRING
,output_tables STRING
,operation_text STRING
,signature STRING
,complexity DOUBLE
,cost_cpu DOUBLE
,cost_mem DOUBLE
,settings STRING
,ds STRING
);
- 进入DataWorks数据开发界面,创建ODPS SQL节点(information_history)并配置定时调度,用于定时将数据写入备份表information_history。完成后单击左上角
图标保存。
创建ODPS SQL节点操作,请参见创建ODPS SQL节点。
ODPS SQL节点运行的命令示例如下:
--project_name为MaxCompute项目名称。
use <project_name>;
insert into table <project_name>.information_history select * from information_schema.tasks_history where ds ='${datetime1}';
${datetime1}
为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}
。
说明 如果需要同时对多个MaxCompute项目的元数据进行统计分析,您可以创建多个ODPS SQL节点,将这些MaxCompute项目的元数据写入到同一张数据备份表中。
步骤四:创建统计TOPN费用账号及耗时作业
TASKS_HISTORY视图中的settings会记录上层调度或用户传入的信息,以JSON格式存储。包含的具体信息有:useragent、bizid、skynet_id和skynet_nodename。您可以通过settings字段定位到创建作业的RAM用户信息。因此您可以基于备份数据表计算TOPN费用账号及耗时作业。操作流程如下:
- 登录MaxCompute客户端,创建一张RAM用户明细表user_ram,记录需要统计的账号及账号ID。
命令示例如下:
create table if not exists <project_name>.user_ram
(
user_id STRING
,user_name STRING
);
- 创建一张统计账号费用的明细表cost_topn,记录TOPN费用账号明细。
命令示例如下:
create table if not exists <project_name>.cost_topn
(
cost_sum DECIMAL(38,5)
,task_owner STRING
)
partitioned by
(
ds STRING
);
- 建一张统计耗时作业的明细表time_topn,记录TOPN耗时作业明细。
命令示例如下:
create table if not exists <project_name>.time_topn
(
inst_id STRING
,cost_time BIGINT
,task_owner STRING
)
partitioned by
(
ds STRING
);
- 进入DataWorks数据开发界面,创建ODPS SQL节点(topn)并配置定时调度,用于定时将cost_topn表中统计的数据写入user_ram表。完成后单击左上角
图标保存。
创建ODPS SQL节点操作,请参见创建ODPS SQL节点。
ODPS SQL节点运行的命令示例如下:
--开启2.0数据类型开关。2.0数据类型详情,请参见2.0数据类型版本。
set odps.sql.decimal.odps2=true;
--将元数据写入cost_topn、time_topn表。user_id为账号ID。您可以在个人信息页面查看账号ID。
insert into table <project_name>.cost_topn partition (ds = '${datetime1}')
select
nvl(cost_sum,0) cost_sum
,case when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name
else a.task_owner
end task_owner
from (
select inst_id
,owner_name
,task_type
,a.input_bytes
,a.cost_cpu
,a.status
,case when a.task_type = 'SQL' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) )
when a.task_type = 'SQLRT' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) )
when a.task_type = 'CUPID' and a.status='Terminated'then cast(a.cost_cpu/100/3600 * 0.66 as DECIMAL(18,5) )
else 0
end cost_sum
,a.settings
,get_json_object(settings, "$.SKYNET_ONDUTY") owner
,case when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name
else get_json_object(a.settings, "$.SKYNET_ONDUTY")
end task_owner
from information_history
where ds = '${datetime1}'
) a
left join <project_name>.user_ram b
on a.task_owner = b.user_id;
insert into table <project_name>.time_topn partition(ds = '${datetime1}')
select inst_id
,cost_time
,case when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name
else a.task_owner
end task_owner
from (
select inst_id
,task_type
,status
,datediff(a.end_time, a.start_time, 'ss') AS cost_time
,case when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name
else get_json_object(a.settings, "$.SKYNET_ONDUTY")
end task_owner
from <project_name>.information_history a
where ds = '${datetime1}'
) a
left join <project_name>.user_ram b
on a.task_owner = b.user_id
;
说明 示例中的
task_type = 'SQL'
表示SQL作业,
task_type = 'SQLRT'
表示查询加速作业,
task_type = 'CUPID'
表示Spark作业。如果需要统计其他计费作业,例如MapReduce、
Lightning(交互式分析)、Mras,您可以按照计费公式添加相应代码行。计费详情,请参见
计算费用(按量计费)。
${datetime1}
为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}
。
步骤五:创建钉钉群机器人并推送TOPN费用账号及耗时作业信息
以PC端为例,创建钉钉群机器人并推送TOPN费用账号及耗时作业信息的操作流程如下:
- 创建钉钉群机器人。
- 选择目标钉钉群,单击右上角的
图标。
- 在群设置面板,单击智能群助手。
- 在智能群助手面板,单击添加机器人。
- 在群机器人对话框的添加机器人区域,单击
图标。
- 在群机器人对话框,单击自定义机器人。
- 在机器人详情对话框,单击添加。
- 在添加机器人对话框,编辑机器人信息。
属性名称 |
设置规则 |
头像 |
单击头像右下角的 图标来编辑头像。
|
机器人名字 |
输入机器人名字。 |
安全设置 |
完成必要的安全设置(至少选择1种),勾选我已阅读并同意《自定义机器人服务及免责条款》,单击完成。
安全设置有3种方式:
- 自定义关键词:最多可以设置10个关键词。
- 加签:勾选加签可以获取到机器人的密钥。
- IP地址(段):只有来自IP地址范围内的请求才会被正常处理。
|
- 在添加机器人对话框,复制生成的Webhook地址。单击完成。
注意 请保管好此Webhook地址,不要公布在外部网站上,泄露后会有安全风险。
- 通过Intellij IDEA创建MAVEN项目并编译推送钉钉群消息的Java程序,编译完成后生成JAR包。
Intellij IDEA操作详情,请单击Intellij IDEA工具界面右上角的Help获取。
- 配置Pom依赖。
Pom依赖如下。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>DingTalk_Information</groupId>
<artifactId>DingTalk_Information</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.35.5-public</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-jdbc</artifactId>
<version>3.0.1</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>com.alibaba.sgri.message.test</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 开发Java程序并生成JAR包topn_new.jar。
Java代码示例如下:
package com.alibaba.sgri.message;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.task.SQLTask;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.dingtalk.api.response.OapiRobotSendResponse;
import com.taobao.api.ApiException;
public class test {
public static void main(String[] args) throws ApiException {
if (args.length < 1) {
System.out.println("请输入日期参数");
System.exit(0);
}
System.out.println("开始读取数据");
DingTalkClient client = new DefaultDingTalkClient(
"https://oapi.dingtalk"
+ ".com/robot/send?access_token=<机器人Webhook地址>\n");
OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
//这里的日期作为参数
markdown.setText(getContent(args[0]));
markdown.setTitle("作业消费TOPN");
request.setMarkdown(markdown);
OapiRobotSendResponse response = client.execute(request);
System.out.println("消息发送成功");
}
/**
* 读取ODPS,获取要发送的数据
*/
public static String getContent(String day) {
Odps odps = createOdps();
StringBuilder sb = new StringBuilder();
try {
//==================这是费用账号=====================
String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;";
Instance costInstance = SQLTask.run(odps, costTopnSql);
costInstance.waitForSuccess();
ResultSet costTopnRecords = SQLTask.getResultSet(costInstance);
sb.append("<font color=#FF0000 size=4>").append("费用账号TOPN(").append(day).append(
")[按照阿里云按量付费计算]").append("</font>").append("\n\n");
AtomicInteger costIndex = new AtomicInteger(1);
costTopnRecords.forEach(item -> {
sb.append(costIndex.getAndIncrement()).append(".").append("账号:");
sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append(" ").append("消费:").append("<font color=#2E64FE>").append(item.get("cost_sum"))
.append("元").append(
"</font>").append("\n\n")
.append("</font>");
});
//==================这是耗时作业=====================
String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;";
Instance timeInstance = SQLTask.run(odps, timeTopnSql);
timeInstance.waitForSuccess();
ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance);
sb.append("<font color=#FF8C00 size=4>").append("耗时作业TOPN(").append(day).append(")")
.append("\n\n").append("</font>");
AtomicInteger timeIndex = new AtomicInteger(1);
timeTopnRecords.forEach(item -> {
sb.append(timeIndex.getAndIncrement()).append(".").append("作业:");
sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>");
sb.append(" ").append("账号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append("耗时:").append("<font color=#2E64FE>").append(item.get("cost_time"))
.append("秒").append(
"</font>").append("\n\n");
});
} catch (OdpsException | IOException e) {
e.printStackTrace();
}
return sb.toString();
}
/**
* 创建ODPS
*/
public static Odps createOdps() {
String project = "<project_name>";
String access_id = "<AccessKey_id>";
String access_key = "<AccessKey_Secret>";
String endPoint = "http://service.odps.aliyun.com/api";
Account account = new AliyunAccount(access_id, access_key);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
return odps;
}
}
说明 自定义钉钉群机器人开发API,请参见
机器人开发。
- 上传生成的topn_new.jar包为MaxCompute资源。
- 创建Shell节点(dingsend),引用topn_new.jar包并配置定时调度。
创建Shell节点操作,请参见创建Shell节点。
Shell节点运行的命令示例如下:
java -jar topn_new.jar $1
$1
为DataWorks的调度参数,您需要在Shell节点右侧,单击调度配置,在基础属性区域配置参数值为${yyyymmdd}
。
步骤六:配置上下游节点调度属性并运行节点
在业务流程面板将information_history、topn和dingsend节点连线形成依赖关系,并配置每个节点的重跑属性和依赖的上游节点。配置完成后在节点上单击右键,选择运行节点即可。
依赖关系配置,请参见依赖关系。
节点上下游配置,请参见节点上下文。
效果展示
钉钉群推送内容效果如下,仅供参考。
在线支持
欢迎您扫码加入MaxCompute开发者社区群以获得更加专业、高效的支持。
在文档使用中是否遇到以下问题
更多建议
匿名提交