MAL,是MQTT Adapter Layer的简称。MAL组件将外部扩展模组中提供的MQTT协议,通过AT或其他命令的方式,为用户转换为AliOS Things系统中提供的统一MQTT协议API,提高用户应用程序的可移植性和硬件无关性。下图是AliOS Things中MAL套件的架构示意图。

其中,组件包括:

  • MAL Core:由AliOS Things提供,MAL核心组件。主要包括MQTT连接管理、数据缓存、协议转换等功能,对上提供MQTT API接口服务,对下提供统一的HAL接口规范(可以对接到不同厂商的AT模组)。
  • MAL Driver:驱动,部分由AliOS Things提供,其他由用户自己提供。MAL驱动模块基于具体型号的通信模组提供的AT命令,实现MAL规范的HAL接口功能。

API包括:

  • MQTT API:这一层接口提供MQTT标准接口,如subscribe、unsubscribe、publish等。
  • MAL HAL API:这一层接口定义MAL核心模块与不同厂商模组驱动之间的统一界面。这一层HAL的对接是模组驱动接入中的主要工作。

API列表

名称 说明
IOT_MQTT_Construct 云端建立MQTT连接
IOT_MQTT_Destroy 销毁指定MQTT连接并释放资源
IOT_MQTT_CheckStateNormal 获取当前MQTT连接状态
IOT_MQTT_Subscribe 向云端订阅指定的MQTT Topic
IOT_MQTT_Subscribe_Sync 向云端订阅指定的MQTT Topic,该接口为同步接口
IOT_MQTT_Unsubscribe 向云端取消订阅指定的topic
IOT_MQTT_Publish 向指定topic推送消息
IOT_MQTT_Publish_Simple 向指定topic推送消息,MQTT句柄可为NULL
mal_init MAL模块初始化,包括初始化底层驱动模块。
mal_add_dev 配置驱动参数,例如串口通信串口号、波特率等。

使用

添加该组件

步骤1,aos make menuconfig,选择Drivers步骤2,选择External module enable
步骤3,选择Exernal module Configurations,并选择MAL MODULE
步骤4,选择MAL device selection,并选择相应设备对接。

头文件

对外头文件代码位于include/network/mal,包括

mal.h
hal_mal.h

使用时只需包含mal.h,hal_mal.h在对接时使用

#include "mal/mal.h"

使用示例

pclient = IOT_MQTT_Construct(&mqtt_params);
 if (NULL == pclient) {
     EXAMPLE_TRACE("MQTT construct failed");
     return -1;
 }
res = IOT_MQTT_Subscribe_Sync(pclient, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL);
if (res < 0) {
    EXAMPLE_TRACE("subscribe failed");
    HAL_Free(topic);
    return -1;
}

 while (1) {    
    int rc = IOT_MQTT_Publish(pclient, TOPIC_UPDATE, &topic_msg);
    if (rc < 0) {
       EXAMPLE_TRACE("IOT_MQTT_Publish fail, ret=%d", rc);
    }

    IOT_MQTT_Yield(pclient, 200);
}

更详细的例子请参考application/example/example_legacy/mal_app/

API 详情

IOT_MQTT_Construct

原型

void *IOT_MQTT_Construct(iotx_mqtt_param_t *pInitParams)

接口说明

与云端建立MQTT连接, 入参pInitParamsNULL时将会使用默认参数建连。

参数说明

参数 数据类型 方向 说明
pInitParams iotx_mqtt_param_t * 输入 MQTT初始化参数,填写NULL将以默认参数建连

返回值说明

说明
NULL 失败
非NULL MQTT句柄

接口示例

iotx_mqtt_param_t       mqtt_params;
iotx_sign_mqtt_t        sign_mqtt;

/* 生成签名 */
gen_aliyun_mqtt_sign(&sign_mqtt);

/* 配置MQTT连接参数 */
mqtt_params.host = sign_mqtt.hostname;
mqtt_params.port = sign_mqtt.port;
mqtt_params.client_id = sign_mqtt.clientid;
mqtt_params.username = sign_mqtt.username;
mqtt_params.password = sign_mqtt.password;
mqtt_params.request_timeout_ms = 2000;
mqtt_params.clean_session = 0;
mqtt_params.keepalive_interval_ms = 60000;
mqtt_params.write_buf_size = 1024;
mqtt_params.read_buf_size = 1024;
mqtt_params.handle_event.h_fp = example_event_handle;

/* 与云端建立MQTT连接 */
pclient = IOT_MQTT_Construct(&mqtt_params);
if (NULL == pclient) {
    return -1;
}

IOT_MQTT_Destroy

原型

int IOT_MQTT_Destroy(void **phandle);

接口说明

销毁指定MQTT连接并释放资源

参数说明

参数 数据类型 方向 说明
phandle void ** 输入 MQTT句柄,可为NULL

返回值说明

说明
0 成功
< 0 失败

接口示例

/* 与云端建立MQTT连接 * /
pclient = IOT_MQTT_Construct(&mqtt_params);

/* 销毁指定MQTT连接并释放资源 * /
IOT_MQTT_Destroy(&pclient);

IOT_MQTT_CheckStateNormal

原型

int IOT_MQTT_CheckStateNormal(void *handle);

接口说明

获取当前MQTT连接状态

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL

返回值说明

说明
0 未连接
1 已连接

接口示例

/* 获取当前MQTT连接状态 */
int state = IOT_MQTT_CheckStateNormal(pclient);
if (1 == state) {
    LOG("MQTT connected");
} else {
    LOG("MQTT disconnected");
}

IOT_MQTT_Yield

原型

int IOT_MQTT_Yield(void *handle, int timeout_ms);

接口说明

用于接收网络报文并将消息分发到用户的回调函数中

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
timeout_ms int 输入 尝试接收报文的超时时间

返回值说明

说明
0 成功

接口示例

while (1) {
     /* 周期上报消息 */
    if (0 == loop_cnt % 20) {
       example_publish(pclient);
    }
    
    /* 接收网络报文并将消息分发到用户的回调函数 * /
    IOT_MQTT_Yield(pclient, 200);

    loop_cnt += 1;
}

IOT_MQTT_Subscribe

原型

int IOT_MQTT_Subscribe(void *handle,
                        const char *topic_filter,
                        iotx_mqtt_qos_t qos,
                        iotx_mqtt_event_handle_func_fpt topic_handle_func,
                        void *pcontext);

接口说明

向云端订阅指定的MQTT Topic

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
topic_filter const char * 输入 需要订阅的topic
qos iotx_mqtt_qos_t 输入 采用的QoS策略
topic_handle_func iotx_mqtt_event_handle_func_fpt 输入 用于接收MQTT消息的回调函数
pcontext void * 输入 用户Context, 会通过回调函数送回

返回值说明

说明
0 成功
< 0 失败

接口示例

const char *fmt = "/%s/%s/user/get";
char *topic = NULL;
int topic_len = 0;

topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len); 
memset(topic, 0, topic_len);

/* 拼接主题 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);

/* 订阅主题 */
res = IOT_MQTT_Subscribe_Sync(handle, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL);
if (res < 0) {
    EXAMPLE_TRACE("subscribe failed");
    HAL_Free(topic);
    return -1;
}

HAL_Free(topic);

IOT_MQTT_Subscribe_Sync

原型

int IOT_MQTT_Subscribe_Sync(void *handle,
                            const char *topic_filter,
                            iotx_mqtt_qos_t qos,
                            iotx_mqtt_event_handle_func_fpt topic_handle_func,
                            void *pcontext,
                            int timeout_ms);

接口说明

向云端订阅指定的MQTT Topic, 该接口为同步接口

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
topic_filter const char * 输入 需要订阅的topic
qos iotx_mqtt_qos_t 输入 采用的QoS策略
topic_handle_func iotx_mqtt_event_handle_func_fpt 输入 用于接收MQTT消息的回调函数
pcontext void * 输入 用户Context, 会通过回调函数送回
timeout_ms int 输入 该同步接口的超时时间

返回值说明

说明
0 成功
< 0 失败

接口示例

const char *fmt = "/%s/%s/user/get";
char *topic = NULL;
int topic_len = 0;

topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len); 
memset(topic, 0, topic_len);

/* 拼接主题 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);

/* 同步订阅,5000毫秒超时 */
res = IOT_MQTT_Subscribe_Sync(handle, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL,5000);
if (res < 0) {
    EXAMPLE_TRACE("subscribe failed");
    HAL_Free(topic);
    return -1;
}

HAL_Free(topic);

IOT_MQTT_Unsubscribe

原型

int IOT_MQTT_Unsubscribe(void *handle, const char *topic_filter);

接口说明

向云端取消订阅指定的topic

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
topic_filter const char * 输入 需要取消订阅的topic

返回值说明

说明
0 成功
< 0 失败

接口示例

/* 订阅主题 */
IOT_MQTT_Subscribe(pclient, TOPIC_GET, IOTX_MQTT_QOS1, _demo_message_arrive, NULL);

/* 取消订阅 */
IOT_MQTT_Unsubscribe(pclient, TOPIC_GET);

IOT_MQTT_Publish

原型

int IOT_MQTT_Publish(void *handle, const char *topic_name, iotx_mqtt_topic_info_pt topic_msg);

接口说明

向指定topic推送消息

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
topic_name const char * 输入 接收此推送消息的目标topic
topic_msg iotx_mqtt_topic_info_pt 输入 需要推送的消息

返回值说明

说明
> 0 成功(消息是QoS1时, 返回值就是这个上报报文的MQTT消息ID, 对应协议里的messageId)
0 成功(消息是QoS0时)
< 0 失败

接口示例

iotx_mqtt_topic_info_t topic_msg;

/* 组装消息 */
memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
topic_msg.qos = IOTX_MQTT_QOS1;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)ptopic_info->payload;
topic_msg.payload_len = ptopic_info->payload_len;

/* 向主题发布消息 */
int rc = IOT_MQTT_Publish(pclient, TOPIC_UPDATE, &topic_msg);
if (rc < 0) {
     EXAMPLE_TRACE("IOT_MQTT_Publish fail, ret=%d", rc);
}

IOT_MQTT_Publish_Simple

原型

int IOT_MQTT_Publish_Simple(void *handle, const char *topic_name, int qos, void *data, int len)

接口说明

向指定topic推送消息

参数说明

参数 数据类型 方向 说明
handle void * 输入 MQTT句柄,可为NULL
topic_name const char * 输入 接收此推送消息的目标topic
qos int 输入 采用的QoS策略
data void * 输入 需要发送的数据
len int 输入 数据长度

返回值说明

说明
> 0 成功(消息是QoS1时, 返回值就是这个上报报文的MQTT消息ID, 对应协议里的messageId)
0 成功(消息是QoS0时)
< 0 失败

接口示例

const char     *fmt = "/%s/%s/user/get";
char           *topic = NULL;
int             topic_len = 0;
char           *payload = "{\"message\":\"hello!\"}";

topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len);

/* 拼接topic字符串 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);

/* 向指定topic推送消息 */
IOT_MQTT_Publish_Simple(0, topic, IOTX_MQTT_QOS0, payload, strlen(payload));
 
HAL_Free(topic);

mal_init

原型

int mal_init(void);

接口说明

MAL模块初始化,包括初始化底层驱动模块。

参数说明

返回值说明

说明
0 成功
负值 失败

接口示例

mal_device_config_t data = {0};
 
data.uart_dev.port = 1;
data.uart_dev.config.baud_rate = 115200;
data.uart_dev.config.data_width = DATA_WIDTH_8BIT;
data.uart_dev.config.parity = NO_PARITY;
data.uart_dev.config.stop_bits  = STOP_BITS_1;
data.uart_dev.config.flow_control = FLOW_CONTROL_DISABLED;
data.uart_dev.config.mode = MODE_TX_RX;

/* 配置驱动参数 */
if (mal_add_dev("sim800", &data) != 0) {
   LOG("Failed to add MAL device!");
   return -1;
}

/* 初始化MAL core */
mal_init();

mal_add_dev

原型

int mal_add_dev(char* driver_name, void* data);

接口说明

配置驱动参数,例如串口通信串口号、波特率等。

参数说明

参数 数据类型 方向 说明
driver_name char * 输入 驱动名称
data void* 输入 配置参数指针

返回值说明

说明
0 成功
负值 失败

接口示例

见mal_init示例

配置说明

MAL可配置项包括:

  • 最大topic长度,默认128
  • 最大消息长度,默认256
  • 是否定义WITH_MAL宏,默认定义WITH_MAL宏
  • 是否定义NO_TCPIP,默认定义NO_TCPIP宏

移植说明

MAL模块需要实现AT连接HAL、系统HAL。

AT连接HAL

MAL AT连接HAL函数定义见mal_op_t。

接口 描述
int (*add_dev)(void* data)

添加设备

data:设备参数

返回值:0成功,-1失败

int (*init)(iotx_mqtt_param_t *pInitParams)

初始化模组,使其处理准备连接状态

pInitParams:连接状态参数

返回值:0成功,-1失败

int (*connect)(char *proKey, char *devName, char *devSecret)

与远端建立MQTT连接

proKey:product key

devName:Device Name

devSecret:Device Secret

返回值:0成功,-1失败

int (*subscribe)(const char *topic, int qos, unsigned int *mqtt_packet_id, int *mqtt_status, int timeout_ms)

订阅主题

topic:主题字符串地址

qos:QoS等级

mqtt_packet_id:消息包ID

mqtt_status:MQTT消息状态

timeout_ms:超时时间

返回值:0成功,-1失败

int (*unsubscribe)(const char *topic, unsigned int *mqtt_packet_id, int *mqtt_status)

取消订阅主题

topic主题字符串地址

mqtt_packet_id:数据包ID

mqtt_status:MQTT消息状态

返回值:0成功,-1失败

int (*publish)(const char *topic, int qos, const char *message, unsigned int msg_len)

发布消息

topic:主题字符串地址

qos:QoS等级

message:消息地址

msg_len:消息长度

返回值:0成功,-1失败

int (*conn_state)(void)

查询连接状态

返回值参考iotx_mc_state_t

int (*disconn)(void)

断开连接

返回值:0成功,-1失败

int (*deinit)(void)

去初始化

返回值:0成功,-1失败

int (*register_mqtt_data_input_cb)(mqtt_data_input_cb_t cb)

注册MQTT数据接收回调

cb:回调函数

返回值:0成功,-1失败

系统wrapper

锁函数

函数名 说明
void *HAL_MutexCreate(void) 创建锁
void HAL_MutexLock(void *mutex) 上锁
void HAL_MutexUnlock(void *mutex) 开锁
void HAL_MutexDestroy(void *mutex) 删除锁

信号量函数

函数名 说明
void *HAL_SemaphoreCreate(void) 创建信号量
void HAL_SemaphorePost(void *sem) 释放信号量
int HAL_SemaphoreWait(void *sem, uint32_t timeout_ms) 等待信号量
void HAL_SemaphoreDestroy(void *sem) 删除信号量

时间函数

函数名 说明
void HAL_SleepMs(uint32_t ms) 睡眠函数
uint64_t HAL_UptimeMs(void) 获取系统时间

内存函数

函数名 说明
void *HAL_Malloc(uint32_t size) 分配内存
void HAL_Free(void *ptr) 释放内存

打印函数

函数名 说明
nt HAL_Snprintf(char *str, const int len, const char *fmt, ...) 格式化打印函数

标准宏和结构体说明

结构体iotx_mqtt_param_t定义

typedef struct {
    uint16_t                   port;
    const char                 *host;
    const char                 *client_id;
    const char                 *username;
    const char                 *password;
    const char                 *pub_key;
    const char                 *customize_info;
    uint8_t                    clean_session;
    uint32_t                   request_timeout_ms;
    uint32_t                   keepalive_interval_ms;
    uint32_t                   write_buf_size;
    uint32_t                   read_buf_size;
    iotx_mqtt_event_handle_t    handle_event;
} iotx_mqtt_param_t, *iotx_mqtt_param_pt;
  • port: 云端服务器端口
  • host: 云端服务器地址
  • client_id: MQTT客户端ID
  • username: 登录MQTT服务器用户名
  • password: 登录MQTT服务器密码
  • pub_key: MQTT连接加密方式及密钥
  • clean_session: 选择是否使用MQTT协议的clean session特性
  • request_timeout_ms: MQTT消息发送的超时时间
  • keepalive_interval_ms: MQTT心跳超时时间
  • write_buf_size: MQTT消息发送buffer最大长度
  • read_buf_size: MQTT消息接收buffer最大长度
  • handle_event: 用户回调函数, 用与接收MQTT模块的事件信息
  • customize_info: 用户自定义上报信息,是以逗号为分隔符kv字符串,如用户的厂商信息,模组信息自定义字符串为"pid=123456,mid=abcd";

pInitParams结构体的成员配置为0或NULL时将使用内部默认参数

结构体mal_op_t定义

typedef struct mal_op_s {
    struct mal_op_s * next;  //<! Next mal_op_t structure

    char *version; //<! Reserved for furture use.

    char *name; //<! Drvier name

    /**
     * Add mal device .
     *
     * @param[in]  data - device parameters
     *
     * @return  0 - success, -1 - failure
     */
    int (*add_dev)(void* data);

    /**
     * Module low level init so that it's ready to setup mqtt connection.
     *
     * @param[in]  pInitParams - connect parameters which are used to setup
     *                           the MQTT connection.
     * @return  0 - success, -1 - failure
     */
    int (*init)(iotx_mqtt_param_t *pInitParams);

    /**
     * Start a socket connection via module.
     *
     * @param[in]  proKey    - product key.
     * @param[in]  devName   - device name.
     * @param[in]  devSecret - device secret.
     *
     * @note: If the module does not accept the triple, ingore these parameters
     *
     * @return  0 - success, -1 - failure
     */
    int (*connect)(char *proKey, char *devName, char *devSecret);

   /**
    * Subscribe a topic via the MQTT connection
    *
    * @param[in]  topic          - MQTT topic.
    * @param[in]  qos            - QoS used.
    * @param[out] mqtt_packet_id - MQTT packet ID.
    * @param[out] mqtt_status    - MQTT subscribe status.
    * @param[out] timeout_ms     - time out in milliseconds.
    *
    * @return  0 - success, -1 - failure
    */
    int (*subscribe)(const char *topic, int qos, unsigned int *mqtt_packet_id, int *mqtt_status, int timeout_ms);

    /**
    * Unsubscribe a topic via the MQTT connection
    *
    * @param[in]  topic          - MQTT topic.
    * @param[out] mqtt_packet_id - MQTT packet ID.
    * @param[out] mqtt_status    - MQTT unsubscribe status.
    *
    * @return  0 - success, -1 - failure
    */
    int (*unsubscribe)(const char *topic, unsigned int *mqtt_packet_id, int *mqtt_status);

   /**
    * Publish MQTT message to the topic via module.
    *
    * @param[in]  topic - MQTT topic
    * @param[in]  qos   - quality of service used
    * @param[in]  message - message to be published
    * @param[in]  msg_len - message length
    *
    * @return  0 - success, -1 - failure
    */
    int (*publish)(const char *topic, int qos, const char *message, unsigned int msg_len);

    /**
    * Query AT MQTT connection status
    *
    * @return  MQTT status value.
    *          Refer to definition of iotx_mc_state_t in mal.h.
    */
    int (*conn_state)(void);

    /**
    * Disconnect the MQTT connection via module.
    *
    * @return  0 - success, -1 - failure
    */
    int (*disconn)(void);

    /**
     * Destroy MAL or exit low level state if necessary.
     *
     * @return  0 - success, -1 - failure
     */
    int (*deinit)(void);

    /**
     * Register mqtt data input function
     * Input data from mqtt module.
     * This callback should be called when mqtt data is received from the module
     * @param[in]  topic     - topic of the received message.
     * @param[in]  topic_len - length of topic.
     * @param[in]  message   - received message.
     * @param[in]  meg_len   - length of received message.
     *
     * @return  0 - success, -1 - failure
     */
    int (*register_mqtt_data_input_cb)(mqtt_data_input_cb_t cb);
} mal_op_t;

枚举iotx_mc_state_t定义

/* State of MQTT client */
typedef enum {
    IOTX_MC_STATE_INVALID = 0,                    /* MQTT in invalid state */
    IOTX_MC_STATE_INITIALIZED = 1,                /* MQTT in initializing state */
    IOTX_MC_STATE_CONNECTED = 2,                  /* MQTT in connected state */
    IOTX_MC_STATE_DISCONNECTED = 3,               /* MQTT in disconnected state */
    IOTX_MC_STATE_DISCONNECTED_RECONNECTING = 4,  /* MQTT in reconnecting state */
    IOTX_MC_STATE_CONNECT_BLOCK = 5               /* MQTT in connecting state when using async protocol stack */
} iotx_mc_state_t;