物联网平台支持自行封装Alink协议数据,建立设备与物联网平台的通信。
Alink协议是针对物联网开发领域设计的一种数据交换规范,详情请参见Alink协议。
其中,物模型相关协议请参见设备属性、事件、服务。
准备工作
根据您的实际情况,准备合适的MQTT客户端。本示例使用Eclipse Paho作为MQTT客户端。
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.1</version>//可以选择您需要的版本 </dependency>
开发Java语言物模型,具体操作请参见物模型开发。
示例代码
示例代码如下所示。将示例代码中的your_ProductKey、your_DeviceName、your_DeviceSecret替换为您自己的设备证书信息,即充电桩设备(device-1)的设备证书,其余代码可直接运行。
为了避免初始化时订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即物联网平台帮设备进行Topic订阅。Topic相关说明请参见什么是Topic。
所有被订阅的下行Topic都会被物联网平台监听。物模型相关的下行Topic主要包括属性上报Reply、属性下行设置和服务下行控制。
上报属性
设置设备属性时,需要订阅的Topic为Alink协议标准Topic:
/sys/{productKey}/{deviceName}/thing/service/property/set
,默认以异步方式返回结果。String productKey = "your_ProductKey"; String deviceName = "your_DeviceName"; String deviceSecret = "your_DeviceSecret"; // MQTT连接 MqttTestClient client; client = new MqttTestClient(productKey, deviceName, deviceSecret); client.connect(); String setTopic = "/thing/event/property/post"; String setTopicReply = "/thing/event/property/post_reply"; // 上报属性,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅) // 此处client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client subscribe client.sysTopic(setTopicReply).subscribe(); // 封装Alink协议系统参数 Map<String, Object> payload = new HashMap<String, Object>(); Map<String, Object> params = new HashMap<String, Object>(); payload.put("id", 11);//id需要保证设备端一段时间内唯一 payload.put("params", params); payload.put("method", "thing.event.property.post"); // 组装属性payload String propKey = "acOutMeterIty"; int statusValue = 30; Map<String, Object> proValue = new HashMap<>(); proValue.put("value", statusValue); proValue.put("time", System.currentTimeMillis()); params.put(propKey, proValue); // 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息) client.sysTopic(setTopic).publish(JSON.toJSONString(payload)); // 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息) client.sysTopic(setTopicReply).readTopic(10000); client.disconnect();
运行代码成功后,日志打印的设备请求和响应如下图所示。
上报事件
String productKey = "your_ProductKey"; String deviceName = "your_DeviceName"; String deviceSecret = "your_DeviceSecret"; // MQTT连接 MqttTestClient client; client = new MqttTestClient(productKey, deviceName, deviceSecret); client.connect(); // topic中为"startChaResEvt"属于物模型事件标识符。 String setTopic = "/thing/event/startChaResEvt/post"; String setTopicReply = "/thing/event/startChaResEvt/post_reply"; // 报事件,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅) client.sysTopic(setTopicReply).subscribe(); // 封装Alink协议系统参数 Map<String, Object> payload = new HashMap<String, Object>(); Map<String, Object> params = new HashMap<String, Object>(); payload.put("id", 11);//id需要保证设备端一段时间内唯一 payload.put("params", params); payload.put("method", "thing.event.startChaResEvt.post"); // 组装属性payload Map<String, Object> dataValue = new HashMap<>(); // key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间; dataValue.put("gunNum", 59); params.put("value", dataValue); params.put("time", System.currentTimeMillis()); // 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息) client.sysTopic(setTopic).publish(JSON.toJSONString(payload)); // 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息) client.sysTopic(setTopicReply).readTopic(10000); client.disconnect();
调用服务
此处为一段伪代码。可以在MQTT建连时,通过callback监听云端下发的控制指令或消息。
调用服务时,同步异步方式取决于物模型中service配置的调用模式:
服务异步方式订阅的Topic为Alink协议标准Topic:
/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}
。服务同步方式订阅的Topic需要遵循RRPC Topic模式。
mqttClient = new MqttClient(url, clientId, persistence); final MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(true); connOpts.setCleanSession(false); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(65); LogUtil.log(clientId + "进行连接, 目的地: " + url); // 此处订阅云端下发的消息 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { LogUtil.log("connection lost, cause:" + cause); cause.printStackTrace(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { TopicChannel topicChannel = getTopic(topic); LogUtil.log("receive message, channel:" + topicChannel + ",topic:" + topic + ", payload:" + new String(message.getPayload(), "UTF-8") + ""); topicChannel.put(message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //如果是qos 0消息 token.resp是没有回复的 LogUtil.log("sent, " + ((token == null || token.getResponse() == null) ? "null" : token.getResponse().getKey())); } }); mqttClient.connect(connOpts);
在文档使用中是否遇到以下问题
更多建议
匿名提交