AMQP转储功能适用于生活物联网平台与企业服务器之间的消息流转,且为推荐方式。通过集成和使用AMQP SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等。

前提条件

开启设备数据同步,并配置要同步数据的产品。详细参见设置数据同步数据同步
说明 当开启数据同步后,集成AMQP客户端SDK来订阅数据。如果通过控制台关闭数据同步再开启时,客户端SDK需要重新进行连接流程,否则无法正常接收数据。如果切换不同的AMQP客户端,需要把之前的客户端断开,再连接新的客户端。否则新客户端无法正常接收数据。

AMQP SDK使用

  1. 引入依赖。
    AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。在项目中添加maven依赖,maven信息如下。
    <!-- amqp 1.0 qpid client -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.47.0</version>
     </dependency>
     <!-- util for base64-->
     <dependency>
       <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.10</version>
    </dependency>           
  2. 认证身份信息。
    • 身份认证需要使用AppKey和AppSecret,该信息可以从控制台中获取。获取身份信息
    • 认证身份信息需要使用EndPoint、AppKey和AppSecret用于鉴权。

      其中,EndPoint是连接节点,具体取值如下表所示。

      区域 End Point
      中国内地 amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671
      新加坡 amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671
      美国(弗吉尼亚) amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671
      德国(法兰克福) amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671
  3. 接收云端消息。

    首先需要创建消息接收的客户端对象client,并传入上面身份认证的profile信息。当消息接收的客户端和服务端建立连接后,服务端会立即向消息接收的客户端推送已订阅的消息,因此建立连接时需要提供默认消息接收的回调接口,用于处理云端推送的消息。

    完整代码示例如下。

    import java.net.URI;
    import java.util.Hashtable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import org.apache.commons.codec.binary.Base64;
    import org.apache.qpid.jms.JmsConnection;
    import org.apache.qpid.jms.JmsConnectionListener;
    import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AmqpJavaClientDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
    
        //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
        private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));
    
        public static void main(String[] args) throws Exception {
    
            String appKey = "${YourAppkey}";
            String appSecret = "${YourAppSecret}";
            String consumerGroupId = "${YourAppkey}";
            long random = xxxxx;
            //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
            String clientId = "${YourClientId}";
    
            String userName = clientId + "|authMode=appkey"                 
                    + ",signMethod=" + "SHA256"
                    + ",random=" + random
                    + ",appKey=" + appKey
                    + ",groupId=" + consumerGroupId + "|";
            String signContent = "random=" + random;
            String password = doSign(signContent, appSecret, "HmacSHA256");
            String connectionUrlTemplate = "failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"
                    + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
    
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // 创建连接。
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // 创建会话。
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // 创建Receiver连接。
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        }
    
        private static MessageListener messageListener = new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    //1.收到消息之后一定要ACK。
                    // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                    // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                    // message.acknowledge();
                    //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
                    // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                    executorService.submit(() -> processMessage(message));
                } catch (Exception e) {
                    logger.error("submit task occurs exception ", e);
                }
            }
        };
    
        /**
         * 在这里处理您收到消息后的具体业务逻辑。
         */
        private static void processMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            } catch (Exception e) {
                logger.error("processMessage occurs error ", e);
            }
        }
    
        private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
            /**
             * 连接成功建立。
             */
            @Override
            public void onConnectionEstablished(URI remoteURI) {
                logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
            }
    
            /**
             * 尝试过最大重试次数之后,最终连接失败。
             */
            @Override
            public void onConnectionFailure(Throwable error) {
                logger.error("onConnectionFailure, {}", error.getMessage());
            }
    
            /**
             * 连接中断。
             */
            @Override
            public void onConnectionInterrupted(URI remoteURI) {
                logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
            }
    
            /**
             * 连接中断后又自动重连上。
             */
            @Override
            public void onConnectionRestored(URI remoteURI) {
                logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
            }
    
            @Override
            public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
    
            @Override
            public void onSessionClosed(Session session, Throwable cause) {}
    
            @Override
            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
    
            @Override
            public void onProducerClosed(MessageProducer producer, Throwable cause) {}
        };
    
        /**
         * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
         */
        private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
            SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            byte[] rawHmac = mac.doFinal(toSignString.getBytes());
            return Hex.encodeHexString(rawHmac);
        }
    }
    说明 消息推送失败时,平台会重新推送,重试策略如下。
    • 如果对端不在线或未回复ack消息,则会造成消息堆积,堆积的消息转为离线消息。
    • 离线消息每隔5min重试推送一次(每次推送10条)。对端如果成功接收了消息,则重试策略会继续推送剩余的离线消息(推送失败的消息,下一次继续推送)。
    • 离线消息最多会保存 1 天,如果 1 天后仍然无法推送成功,则会被删除。
    • 离线消息会进入单独的队列,不会影响后续消息的实时推送。

消息格式

  • 物的属性变更消息

    消息字段说明如下。

    参数 子参数 子参数 类型 含义
    deviceType String 设备所属品类
    gmtCreate Long 数据流转消息产生时间,自1970-1-1起流逝的毫秒值
    iotId String 设备的唯一id
    productKey String 设备所属产品的唯一标识符
    deviceName String 设备名称
    items JSON 变更的状态列表
    attribute String 发生变更的属性,具体取值由具体情况确定
    value 具体数据类型由具体情况确定 变更值
    time Long 设备属性发生变化的时间,自1970-1-1起流逝的毫秒值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "iotId": "Xzf15db9xxxxxxxxWR001046b400",
        "productKey": "a17xxxxTYNA",
        "gmtCreate": 153xxxx145304,
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "items": {
            "WIFI_Rx_Rate": {
                "value": 74274,
                "time": 1534299145344
            }
        }
    }  
  • 物的事件变更消息

    消息字段说明如下。

    参数 子参数 类型 含义
    deviceType String 设备所属品类
    iotId String 设备的唯一id
    productKey String 设备所属产品的唯一标识符
    deviceName String 设备名称
    identifier String 事件标识符,对应事件的identifier
    name String 事件名称
    type String 事件类型
    time Long 设备上报value对应的时间,自1970-1-1起流逝的毫秒值
    value JSON 变更的事件属性列表:key-value键值对
    key String 属性key
    value 具体数据类型由具体情况确定 属性取值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "identifier": "Doorxxxxication",
        "iotId": "Xzf15db9xxxxxxxxx01046b400",
        "name": "开门通知",
        "time": 1534319108982,
        "type": "info",
        "productKey": "a17xxxxTYNA",
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "value": {
            "KeyID": "x8xxxxxkDY",
            "LockType": 3
        }
    }
  • 设备服务返回消息

    消息字段说明如下。

    参数 类型 含义
    gmtCreate Long 数据流转消息产生时间,自1970-1-1起流逝的毫秒值
    iotId String 设备的唯一id
    productKey String 设备所属产品的唯一标识符
    deviceName String 设备名称
    requestId String 阿里云产生和设备通信的信息id
    code Integer 调用的结果信息
    message String 结果信息说明
    topic String 服务调用下行时使用的topic
    data Object 设备返回的结果,非透传之间返回设备结果,透传则需要经过脚本转换

    消息示例如下。

    {
      "gmtCreate": 151xxxx39881,
      "iotId": "4z819VQHxxxxxxxxxxxx7ee200",
      "productKey": "p1gxxxxBd",
      "deviceName": "xxxxxxxxxx",
      "requestId": "1234",
      "code": 200,
      "message": "success",
      "topic": "/sys/p1gsv0teUBd/xxxxxxxxxx/thing/service/property/set_reply",
      "data": {}
    }           
  • 物的状态变更消息

    为了提高消息有效性,设备上下线过于频繁时,会对消息进行筛检。

    消息字段说明如下。

    参数 子参数 类型 含义
    deviceType String 设备所属品类
    gmtCreate Long 数据流转消息产生时间,自1970-1-1起流逝的毫秒值
    iotId String 设备的唯一id
    action String 设备状态变更动作:
    • online:上线动作
    • offline:下线动作
    productKey String 设备所属产品的唯一标识符
    deviceName String 设备名称
    status JSON 状态信息,元素包括:value-状态值,time-发生变化的时间
    time Long 设备上下线状态发生变化的时间,自1970-1-1起流逝的毫秒值
    value String 设备上下线状态

    value状态值定义:

    • 1:在线
    • 0:离线

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "iotId": "Xzf15dxxxxxxxxxxxxxxxx01046b400",
        "action": "online",
        "productKey": "a17xxxxxxTYNA",
        "gmtCreate": 153xxxx1368,
        "deviceName": "Xzf1xxxxxxxxxxxxgUo6WR",
        "status": {
            "time": 1534319611368,
            "value": "1"
        }
    }       
  • 用户绑定变更消息

    用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。

    topic: /${pk}/${dn}/thing/awss/enrollee/user

    消息字段说明如下。

    参数 子参数 类型 含义
    bind bool true-绑定;false-解绑
    productKey String 设备所属产品的唯一标识符
    deviceName String 设备名称
    iotId String 设备的唯一id
    messageCreateTime JSON 消息创建时间
    identityInfos list 用户信息列表
    identityId String 用户身份id
    scopeId String 隔离id
    tenantId String 租户id
    owned Integer 拥有标记
    • 0:分享者
    • 1:拥有者
    params Map 扩展参数(暂未使用)
    {
      "bind":true,
      "productKey": "123xxxx569",
      "deviceName": "deviceNamexxxx34",
      "iotId": "",
      "messageCreateTime": 151xxxx9881,
      "identityInfos":[
         {
           "identityId":"50xxxxxxxxxxxx62060259",
           "scopeId":"",
           "tenantId":"1D89B5xxxxxxxxxxxxxxxx861678FF",
           "owned":1
         }
      ],
      "params":{
      }
    }