恶意文件检测SDK功能依托云安全中心多引擎检测平台,可识别离线文件和阿里云OSS文件中存在的常见病毒,例如勒索病毒、挖矿程序等,防止恶意文件传播和执行。本文介绍如何使用恶意文件检测SDK功能。
功能说明
恶意文件检测SDK为云端检测方案,使用该功能会将您的文件上传到云端进行检测。
支持以下两种使用方式:
调用SDK检测离线文件:通过SDK接入的方式检测恶意文件,在返回结果中获取恶意文件信息。支持通过Java或Python方式接入。
云安全中心控制台检测OSS文件:在云安全中心控制台执行对阿里云对象存储Bucket内文件的检测,并支持查看存在风险的文件列表。
支持对压缩包文件解压并进行检测。
在调用SDK检测离线文件场景下,默认不解压,需要您自行配置解压,扫描时可以设置是否识别压缩文件并解压、最大解压层级和最大解压文件数。
在OSS文件检测场景下,默认不解压,需要您自行配置解压,支持全局和单个Bucket粒度下配置压缩包的解压层级。
使用限制
一次恶意文件检测仅可以检测一个不超过20 MB的文件。
试用版和企业版的默认接口请求频率不同。
试用版:10次/秒。
企业版:20次/秒。
支持检测的压缩包文件类型有
.7z
、.zip
、.tar
、.gz
、.rar
、.ar
、.bz2
、.xz
和.lzma
。一个压缩包支持解压的层级最多为5层,解压后的文件总个数最多为1,000个,解压后的所有文件总大小最多为1 GB。对于超出限制范围的文件,不会被检测。
恶意文件的检测速度会受到网速、计算机性能、云产品限制等方面的影响。恶意文件检测SDK内部采用队列的方式,缓解外部峰值时的请求来提高并发率。当内部的队列满时,拒绝外部继续提交请求,并使用等待接口,等待队列有空间可用时再处理外部请求。
您可以通过增加队列长度来提高并发量,该方法会影响部分样本的检测时长。
timeout_ms
参数为样本超时时间,单位为毫秒。为了减少超时,建议您将timeout_ms
参数设置为60,000毫秒(即60秒)。
计费说明
使用恶意文件检测SDK功能会消耗文件检测次数。如果是压缩包文件,每个压缩包文件消耗的文件检测次数按照解压后文件个数计算。
经过企业实名认证的阿里云账号可以免费试用恶意文件检测SDK功能。试用版提供10,000次恶意文件检测次数。
购买企业版可以选择您所需的恶意文件检测次数。
支持检测病毒类型
前提条件
如果您使用的是RAM用户,请确保已为RAM用户授予AliyunYundunSASFullAccess权限。具体操作,请参见为RAM用户授权。
开通服务
云安全中心支持通过免费试用和付费购买的方式开通服务。
免费试用
如果您的阿里云账号已通过企业认证,您可以通过免费试用开通恶意文件检测SDK服务,并获取1万次恶意文件检测次数。每个阿里云账号仅有一次免费试用机会。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即试用。
付费购买
如果免费试用无法满足需求,您可以通过付费购买的方式开通恶意文件检测SDK服务。
如果存在未使用的免费试用次数,付费购买次数后,剩余的免费试用次数将累计在您付费购买的次数中。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即购买。在购买面板,选择恶意文件检测SDK为是,并按照要检测文件的数量购买足够的恶意文件检测次数。
如果您已购买云安全中心付费版本,您可以直接选择所需的恶意文件检测次数。如未购买,请根据您的需求选择云安全中心版本:
无需使用云安全中心的其他安全防护功能时,版本选择仅采购增值服务。
如需使用云安全中心的其他功能,例如漏洞修复、容器威胁检测,请选择相应的云安全中心版本。各版本的功能差异详情,请参见功能特性。
仔细阅读并选中服务协议,单击立即购买并完成支付。
开通恶意文件检测SDK服务后,您可以在恶意文件检测SDK页面,查看您的恶意文件检测SDK的剩余检测次数。
调用SDK检测恶意文件
准备工作
已配置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。阿里云SDK支持通过定义
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。更多信息,请参见身份验证配置。您需要选择接入方式并参考下表的说明获取SDK包。
接入方式
版本要求
获取SDK包
Java接入
必须使用1.8或更高版本的JDK。
您可以通过以下方式获取Java SDK。
离线导入安装:在联网环境下访问Java SDK代码库并下载Java SDK,将下载的Java SDK添加到项目工程中。
Python接入
Python 3.6及以上版本。
您可以通过以下方式获取部署SDK包。
使用pip快速安装(联网环境下推荐):
pip install -U alibabacloud_filedetect
离线安装(无互联网连接的环境):在联网环境下访问Python代码库并下载Python SDK。将Python SDK上传至项目工程环境,解压压缩包后,运行以下安装命令:
# 切换至python SDK根目录 cd alibabacloud-file-detect-python-sdk-master # 安装SDK,注意python的版本 python setup.py install
示例代码
使用Python接入时,请替换下面示例中的测试用例路径参数,即path
。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化检测结果
* @param result 检测结果对象
* @return 格式化后的字符串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步检测目录或文件
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 异步检测目录或文件
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 开始对文件或目录进行
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任务执行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 获取检测器实例
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 设置解压缩参数(可选,默认不解压压缩包)
boolean decompress = true; // 是否识别压缩文件并解压,默认为false
int decompressMaxLayer = 5; // 最大解压层数,decompress参数为true时生效
int decompressMaxFileCount = 1000; // 最大解压文件数,decompress参数为true时生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 示例用法1:扫描本地目录或文件
boolean is_sync_scan = false; // 是异步检测还是同步检测。异步检测性能更好。false表示异步检测
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String path = "test2.php"; // 待扫描的文件或目录
// 启动扫描,直到扫描结束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 示例用法2:扫描URL文件
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待扫描的URL文件
String md5 = "a767f*************6e21d000000"; // 待扫描的文件MD5
// 同步扫描。如果需要异步扫描,调用detectUrl接口
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化检测结果
@param result 检测结果对象
@return 格式化后的字符串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步检测目录或文件
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
异步检测目录或文件
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。True是同步, False是异步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
开始对文件或目录进行检测
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任务执行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 获取检测器实例
detector = OpenAPIDetector.get_instance()
# 读取环境变量中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 设置解压缩参数(可选,默认不解压压缩包)
decompress = True # 是否识别压缩文件并解压,默认为false
decompressMaxLayer = 5 # 最大解压层数,decompress参数为true时生效
decompressMaxFileCount = 1000 # 最大解压文件数,decompress参数为true时生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 示例用法1:扫描本地目录或文件
is_sync_scan = False # 是异步检测还是同步检测。异步检测性能更好。False表示异步检测
timeout_ms = 500000 # 单个样本检测时间,单位为毫秒
path = "test.bin" # 待扫描的文件或目录
# 启动扫描,直到扫描结束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 示例用法2:扫描URL文件
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待扫描的URL文件
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步扫描。如果需要异步扫描,调用detectUrl接口
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
返回结果
调用SDK进行恶意文件检测后,只能在程序的运行结果中查看检测结果,检测结果不会同步到云安全中心控制台中。控制台中仅支持查看剩余检测次数。风险文件总览页面的统计数据,例如检测文件总数等,为OSS文件检测的结果。
struct DetectResult {
std::string md5; // 样本md5
long time = 0; // 用时,单位为毫秒
ERR_CODE error_code; // 错误码
std::string error_string; // 扩展错误信息
enum RESULT {
RES_WHITE = 0, //安全文件
RES_BLACK = 1, //可疑文件
RES_PENDING = 3 //检测中
};
RESULT result; //检测结果
int score; //检测分值,取值范围0~100
std::string virus_type; //病毒类型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //扩展信息为JSON字符串
struct CompressFileDetectResultInfo {
std::string path; // 压缩文件路径
RESULT result; // 检测结果
int score; // 分值,取值范围0-100
std::string virus_type; //病毒类型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //扩展信息为JSON字符串
};
std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是压缩包,并且开启了压缩包解压参数,则此处会输出压缩包内文件检测结果
};
错误码说明如下:
enum ERR_CODE {
ERR_INIT = -100, // 需要初始化,或者重复初始化
ERR_FILE_NOT_FOUND = -99, // 文件未找到
ERR_DETECT_QUEUE_FULL = -98, // 检测队列满
ERR_CALL_API = -97, // 调用API错误
ERR_TIMEOUT = -96, // 超时
ERR_UPLOAD = -95, //文件上传失败;用户可重新发起检测,再次尝试
ERR_ABORT = -94, //程序退出,样本未得到检测
ERR_TIMEOUT_QUEUE = -93, //队列超时,用户发起检测频率过高或超时时间过短
ERR_MD5 = -92, // MD5格式不对
ERR_URL = -91, // URL格式不对
ERR_SUCC = 0 // 成功
};
检测分值越高,文件可能存在的风险越高。检测分值和危险等级对应表如下:
分数区间 | 危险等级 |
0~60 | 安全 |
61~70 | 风险 |
71~80 | 可疑 |
81~100 | 恶意 |
检测OSS文件
执行Bucket文件检测
执行Bucket文件检测前,请确保当前阿里云账号下有足够的剩余检测次数。如果剩余检测次数不足,您可以在恶意文件检测SDK页面的风险文件总览页签下,单击升级配置购买足够的检测次数。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
单击OSS文件检测页签,选择合适的检测方式并执行检测。
如果您的Bucket没有在OSS文件检测的列表中,您可以单击同步Bucket,同步最新的Bucket列表。
检测方式
说明
操作步骤
手动全量检测
检测单个或多个Bucket内的所有文件。
在OSS文件检测页签,单击单个Bucke操作列的检测或选中多个Bucket后单击批量检测。
在检测对话框,指定需要检测文件的类型、扫描路径。
如果选择检测的文件类型包含压缩类型,可以设置解压层级(最多5层,支持不解压)和单个压缩包解压文件数量限制(最多1000个)。
单击确定。
手动增量检测
针对已检测且上次检测后有文件更新的Bucket,提供只检测未检测过的文件的功能。
在OSS文件检测页签,单击目标Bucke操作列的增量检测。
在增量检测对话框中,指定需要检测文件的类型、解压层级(压缩类型文件支持)、扫描路径。
单击确定。
自动检测
通过配置扫描策略可为指定Bucket开启周期性自动检测。配置前建议了解以下信息:
一个Bucket只能在一个策略中生效。
自动检测仅会检测OSS新增的文件,不会重复检测同一个文件。
在OSS文件检测页签,单击策略管理区域的策略配置。
在策略管理面板,单击新增策略。
如果已配置策略的检测方案符合检测要求,您可以单击该策略操作列的编辑,将目标Bucket添加到该策略的生效范围内。
在策略创建面板,配置策略名称、检测周期、文件检测时间、文件检测类型、解压层级(压缩类型文件支持)和生效Bucket等。
单击确定。
查看检测结果
在风险文件总览页签:
在风险文件列表上方,查看已检测的OSS文件总数、存在不同等级风险(高危、中危、低危)的文件数量和剩余检测次数。
在风险文件列表中,单击目标风险文件操作列的详情,在该文件的详情面板,查看检测出的存在风险的文件详情、事件说明等信息。
如果是压缩包文件,可单击文件前的展开图标,展示该压缩包下的风险文件列表,该压缩包文件的风险等级为其下风险文件中的最高风险等级。
在压缩包文件的风险详情面板,还可查看该压缩包中的风险文件数、解压后检测的文件总数,以及风险文件列表等信息。
重要检测出风险的文件可能被植入了恶意脚本、DDoS木马等病毒,建议您根据事件说明中的处置建议及时处理该文件。
在OSS文件检测页签
在Bucket列表上方,查看检测出恶意文件的Bucket数、存在不同等级风险(高危、中危、低危)的文件数量、未检测Bucket数量和Bucket总数。
在Bucket列表中,单击目标Bucke操作列的详情,查看Bucket的基本信息和风险文件详情。
在压缩包的文件检测详情页面,还可以查看是否配置文件解压、解压后的文件总数、检测的文件数量以及已扫描、未扫描的风险文件列表。
导出检测结果
导出所有风险文件列表
在风险文件总览页签,单击图标,等待文件导出完成后,在提示框中单击下载。
按Bucket维度导出所有风险文件列表
在OSS文件检测页签,单击图标,等待文件导出完成后,在提示框中单击下载。
相关文档
您可以通过调用API接口方式使用恶意文件检测功能。具体使用方法如下:
- 本页导读 (1)