异步提交

语法:

submit job insert overwrite into xxx select ...

该命令会返回一个task_id,例如:

mysql> submit job insert overwrite into test select * from test_external_table;
+---------------------------------------+
| job_id                               |
+---------------------------------------+
| 2017112122202917203100908203303000715 |
+---------------------------------------+
1 row in set (1.77 sec)

查询状态

语法:

show job status where job=‘xxx’

例如:

mysql> show job status where job='2017112122202917203100908203303000715';
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
| job_id                              | schema_name | status  | fail_msg | create_time           | update_time           | definition                           |
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
| 2017112122202917203100908203303000715 | test    | RUNNING | NULL     | 2017-11-21 22:20:31.0 | 2017-11-21 22:20:40.0 |  insert into test select * from test |
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
1 row in set (0.35 sec)

任务状态:

INIT -> SUBMITTED -> RUNNING -> FINISH | FAILED
入队列 -> 被调度起来提交 -> 后台开始执行 -> 成功或失败

任务终止

语法:

cancel job 'xxx'

例如:

mysql> cancel job '2017112122202917203100908203303000715';
Query OK, 1 row affected (0.02 sec)

说明:

  • 未调度起来的任务会被移除队列。
  • 正在运行的任务会被终止,已导入数据会被回滚。
  • 已完成(失败或成功)的任务也会被移除。

如何在D2/DataWorks定时调度

在D2/DataWorks中新增一个Shell任务,通过这个任务来使用D2/DataWorks的定时调度。

下面的例子是配置MaxCompute导入的Shell脚本:

#!/usr/bin/env bash

host="xxx.petadata.rds.aliyuncs.com" ## your instance dns
port="3303"  ## your instance vport
user="cstore"
password="cstore"
database="testdb"
source_table="odps_test_external_table"
target_table="cstore_test"
timeout=86400   #24个小时,超时


sql="submit job insert overwrite into ${target_table} select * from ${source_table}"
echo "exec SQL:$sql"

mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${sql}" | egrep "201[0-9]+" > import_${target_table}_jobid.txt

echo 执行返回的jobid: `cat import_${target_table}_jobid.txt`

jobid=`cat import_${target_table}_jobid.txt`

check_finish_sql="show job status where job = '${jobid}'"
echo "Check Finish SQL:$check_finish_sql"

mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${check_finish_sql}" | egrep "${jobid}" | awk '{print $3}' > import_${target_table}_result.txt

result=`cat import_${target_table}_result.txt`
begin_time=$(date "+%s")
echo "begin_time = "$begin_time

while true
do
    if [[ "$result" == "FINISH" ]]; then
        break;
    elif [[ "$result" == "FAILED" ]]; then
        echo "$jobid is failed, so exit"
        exit -1
    else
        echo "$jobid current status is $result"
    fi

    end_time=$(date "+%s")
    cost=$(($end_time - $begin_time))

    if [[ "$cost" -gt "$timeout" ]]
    then
        echo "$jobid has cost $cost second >= $timeout , so exit"
        exit -1
    else
        echo "$jobid is running using $cost seconds"
        sleep 30
    fi

    mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${check_finish_sql}" | egrep "${jobid}" | awk '{print $3}' > import_${target_table}_result.txt
    result=`cat import_${target_table}_result.txt`

done

end_time=$(date "+%s")
cost=$(($end_time - $begin_time))
echo "$jobid has finished, using $cost seconds"