本文提供设备先使用HTTPS协议认证后,再通过MQTT协议接入物联网平台的示例代码。

设备可以通过MQTT协议直连接入物联网平台,但有的场景下,设备并不直连接入。物联网平台支持设备先使用HTTPS认证后,再通过MQTT接入。相关配置说明,请参见使用HTTPS认证再连接。设备认证成功,并接入物联网平台之后,开发消息收发和其他业务的方法与MQTT直连设备开发相同。

本文将提供HTTPS认证后再进行MQTT接入的示例。

pom.xml配置

pom.xml文件中,添加以下依赖引入Eclipse Paho包和阿里fastjson包。

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.1</version>
</dependency>

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.61</version>
</dependency>

下载根证书

使用TLS方式(securemode=2)接入,需要使用物联网平台根证书。请单击这里下载根证书。下载之后,请将根证书放置到Java Maven工程的resource目录下。

示例代码

/*   
 * Copyright © 2019 Alibaba. All rights reserved.
 */
package com.aliyun.iot.demo;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.alibaba.fastjson.JSONObject;

/**
 * 基于Eclipse Paho开发
 * 自主接入开发流程及参数填写,请参见“使用HTTPS认证再连接”
 * https://help.aliyun.com/document_detail/73742.html
 */
public class IotMqttClientWithAuthByHttps {

    // 地域ID,填写您的产品所在地域ID。地域ID表达方法,请参见https://help.aliyun.com/document_detail/40654.html
    private static String regionId = "cn-shanghai";

    // 定义加密方式。MAC算法可选算法:HmacMD5、HmacSHA1、HmacSHA256
    private static final String HMAC_ALGORITHM = "hmacsha1";

    // MQTT客户端
    private MqttClient sampleClient = null;

    /**
     * 建立MQTT连接
     * 
     * @param productKey 产品key
     * @param deviceName 设备名称
     * @param deviceSecret 设备密钥
     */
    public void connect(String productKey, String deviceName, String deviceSecret) {

        // 表示客户端ID。建议您使用设备的MAC地址或SN码,64字符内。
        String clientId = productKey + "." + deviceName;

        // 时间戳
        long timestamp = System.currentTimeMillis();

        // 设备认证
        String result = null;
        try {
            // 认证参数
            JSONObject params = new JSONObject();
            params.put("clientId", clientId);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("timestamp", timestamp);
            params.put("resources", "mqtt");
            params.put("version", "default");
            params.put("signmethod", HMAC_ALGORITHM);
            params.put("sign", sign(params, deviceSecret));

            // 认证
            result = auth(params, deviceSecret);
        } catch (Exception e) {
            System.out.println("https auth failed: productKey=" + productKey + ",deviceName=" + deviceName);
            e.printStackTrace();
            return;
        }

        // 解析认证结果
        JSONObject json = JSONObject.parseObject(result);
        int code = json.getIntValue("code");
        if (code != 200) {
            System.out.println("https auth failed: productKey=" + productKey + ",deviceName=" + deviceName);
            System.out.println("https auth failed: message=" + json.getString("message"));
            return;
        }
        JSONObject data = json.getJSONObject("data");
        String mqttUsername = data.getString("iotId");
        String mqttPassword = data.getString("iotToken");
        String host = data.getJSONObject("resources").getJSONObject("mqtt").getString("host");
        int port = data.getJSONObject("resources").getJSONObject("mqtt").getIntValue("port");
        String broker = "ssl://" + host + ":" + port;
        String mqttClientId = clientId;

        // 建立MQTT连接
        connect(broker, mqttClientId, mqttUsername, mqttPassword);
    }

    /**
     * 建立MQTT连接
     * 
     * @param serverURL 连接服务器地址
     * @param clientId MQTT接入客户端ID
     * @param username MQTT接入用户名
     * @param password MQTT接入密码
     */
    protected void connect(String serverURL, String clientId, String username, String password) {
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            sampleClient = new MqttClient(serverURL, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setMqttVersion(4);// MQTT 3.1.1
            connOpts.setUserName(username);// 用户名
            connOpts.setPassword(password.toCharArray());// 密码
            connOpts.setSocketFactory(createSSLSocket()); // 使用TLS,需要下载根证书root.crt,设置securemode=2。
            connOpts.setCleanSession(false); // 不清理离线消息,qos=1的消息在设备离线期间会保存在云端。
            connOpts.setAutomaticReconnect(false); // 本demo关闭了自动重连,生产环境强烈建议开启自动重连。
            connOpts.setKeepAliveInterval(300); // 设置心跳,建议300秒。
            // 先设置回调。如果是先CONNECT,后设置回调,可能会导致消息到达时回调还没准备好,会导致丢失。
            sampleClient.setCallback(new MqttCallback() {

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 处理其他下行消息,强烈建议另起线程处理,以免回调堵塞。
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                @Override
                public void connectionLost(Throwable cause) {
                }
            });
            System.out.println("Connecting to broker: " + serverURL);
            sampleClient.connect(connOpts);
            System.out.print("Connected: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
        } catch (MqttException e) {
            System.out.print("connect failed: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
            System.out.println("reason " + e.getReasonCode());
            System.out.println("msg " + e.getMessage());
            System.out.println("loc " + e.getLocalizedMessage());
            System.out.println("cause " + e.getCause());
            System.out.println("excep " + e);
            e.printStackTrace();
        } catch (Exception e) {
            System.out.print("connect exception: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
            System.out.println("msg " + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 发布消息,默认qos=0
     * 
     * @param topic 发布消息的Topic
     * @param payload 发布的消息内容
     */
    public void publish(String topic, String payload) {
        byte[] content = payload.getBytes(StandardCharsets.UTF_8);
        publish(topic, 0, content);
    }

    /**
     * 发布消息
     * 
     * @param topic 发布消息的Topic
     * @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2
     * @param payload 发布的消息内容
     */
    public void publish(String topic, int qos, byte[] payload) {
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        try {
            sampleClient.publish(topic, message);
            System.out.println("Message published: topic=" + topic + ",qos=" + qos);
        } catch (MqttException e) {
            System.out.println("publish failed: topic=" + topic + ",qos=" + qos);
            System.out.println("reason " + e.getReasonCode());
            System.out.println("msg " + e.getMessage());
            System.out.println("loc " + e.getLocalizedMessage());
            System.out.println("cause " + e.getCause());
            System.out.println("excep " + e);
            e.printStackTrace();
        }
    }

    /**
     * HTTPS 认证
     * 
     * @param params 签名参数
     * @param deviceSecret 设备密钥
     * @return 认证结果
     * @throws Exception
     */
    private String auth(JSONObject params, String deviceSecret) throws Exception {

        // 认证地址
        URL url = new URL("https://iot-auth." + regionId + ".aliyuncs.com/auth/devicename");

        HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
        conn.setRequestMethod("POST");
        conn.setRequestProperty("Content-type", "application/x-www-form-urlencoded");
        conn.setDoOutput(true);
        conn.setDoInput(true);

        // 获取URLConnection对象对应的输出流
        PrintWriter out = new PrintWriter(conn.getOutputStream());
        // 发送请求参数
        out.print(authBody(params));
        // flush输出流的缓冲
        out.flush();

        // 获取URLConnection对象对应的输入流
        BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
        // 读取URL的响应
        String result = "";
        String line = "";
        while ((line = in.readLine()) != null) {
            result += line;
        }
        System.out.println("----- auth result -----");
        System.out.println(result);

        // 关闭输入输出流
        in.close();
        out.close();
        conn.disconnect();

        return result;
    }

    /**
     * 生成认证请求内容
     * 
     * @param params 签名参数
     * @return 认证请求payload
     */
    private String authBody(JSONObject params) {

        // 拼接payload
        StringBuffer payload = new StringBuffer();
        for (String key : params.keySet()) {
            payload.append(key);
            payload.append("=");
            payload.append(params.getString(key));
            payload.append("&");
        }
        payload.deleteCharAt(payload.length() - 1);

        System.out.println("----- auth payload -----");
        System.out.println(payload);

        return payload.toString();
    }

    /**
     * 使用 HMAC_ALGORITHM 加密
     *
     * @param content 明文
     * @param secret 密钥
     * @return 密文
     */
    private String encrypt(String content, String secret) {
        try {
            byte[] text = content.getBytes(StandardCharsets.UTF_8);
            byte[] key = secret.getBytes(StandardCharsets.UTF_8);
            SecretKeySpec secretKey = new SecretKeySpec(key, HMAC_ALGORITHM);
            Mac mac = Mac.getInstance(secretKey.getAlgorithm());
            mac.init(secretKey);
            return byte2hex(mac.doFinal(text));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 二进制转十六进制字符串
     *
     * @param b 二进制数组
     * @return 十六进制字符串
     */
    private String byte2hex(byte[] b) {
        StringBuffer sb = new StringBuffer();
        for (int n = 0; b != null && n < b.length; n++) {
            String stmp = Integer.toHexString(b[n] & 0XFF);
            if (stmp.length() == 1) {
                sb.append('0');
            }
            sb.append(stmp);
        }
        return sb.toString().toUpperCase();
    }

    /**
     * 设备端签名
     *
     * @param params 签名参数
     * @param deviceSecret 设备密钥
     * @return 签名十六进制字符串
     */
    private String sign(JSONObject params, String deviceSecret) {

        // 请求参数按字典顺序排序
        Set<String> keys = getSortedKeys(params);

        // version、sign、resources和signmethod除外
        keys.remove("version");
        keys.remove("sign");
        keys.remove("resources");
        keys.remove("signmethod");

        // 组装签名明文
        StringBuffer content = new StringBuffer();
        for (String key : keys) {
            content.append(key);
            content.append(params.getString(key));
        }

        // 计算签名
        String sign = encrypt(content.toString(), deviceSecret);
        System.out.println("sign content=" + content);
        System.out.println("sign result=" + sign);

        return sign;
    }

    /**
     * 获取JSON对象排序后的key集合
     *
     * @param json 需要排序的JSON对象
     * @return 排序后的key集合
     */
    private Set<String> getSortedKeys(JSONObject json) {
        SortedMap<String, String> map = new TreeMap<String, String>();
        for (String key : json.keySet()) {
            String vlaue = json.getString(key);
            map.put(key, vlaue);
        }
        return map.keySet();
    }

    protected SSLSocketFactory createSSLSocket() throws Exception {

        // CA根证书,可以从官网下载https://help.aliyun.com/document_detail/73742.html
        InputStream in = IotMqttClientWithAuthByHttps.class.getResourceAsStream("/root.crt");
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
        Certificate ca = cf.generateCertificate(in);
        in.close();

        String keyStoreType = KeyStore.getDefaultType();
        KeyStore keyStore = KeyStore.getInstance(keyStoreType);
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca", ca);
        String tmfAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
        tmf.init(keyStore);
        SSLContext context = SSLContext.getInstance("TLSV1.2");
        context.init(null, tmf.getTrustManagers(), null);
        SSLSocketFactory socketFactory = context.getSocketFactory();
        return socketFactory;
    }

    public static void main(String[] args) {
        String productKey = "您的设备productKey";
        String deviceName = "您的设备deviceName";
        String deviceSecret = "您的设备deviceSecret";
        IotMqttClientWithAuthByHttps client = new IotMqttClientWithAuthByHttps();
        client.connect(productKey, deviceName, deviceSecret);
        String updateTopic = "/" + productKey + "/" + deviceName + "/user/update";
        client.publish(updateTopic, "hello mqtt with https auth");
    }
}