您可以在物联网平台上自定义Topic类,设备将消息发送到自定义Topic中,服务端通过AMQP SDK获取设备上报消息;服务端通过调用物联网平台接口Pub向设备发布指令。自定义Topic通信不使用物模型,消息的数据结构由您自定义。

背景信息

本示例中,电子温度计定期与服务器进行数据的交互,传递温度和指令等信息。温度计向服务器上行发送当前的温度;服务器向温度计下行发送精度设置指令。

自定义Topic通信

准备开发环境

本示例中,设备端和云端均使用Java语言的SDK,需先准备Java开发环境。您可从Java 官方网站下载,并安装Java最新开发环境。

创建产品和设备

  1. 登录物联网平台控制台
  2. 实例概览页面,找到对应的实例,单击实例进入实例详情页面。
    实例概览
  3. 在左侧导航栏,单击设备管理 > 产品
  4. 单击创建产品,创建温度计产品,获取productKey,例如a1uzcH0****
    详细操作指导,请参见创建产品
  5. 产品创建成功后,单击该产品对应的查看
  6. 产品详情页的Topic类列表页签下,单击自定义Topic,增加自定义Topic类。

    详细操作指导,请参见自定义Topic

    本示例中,定义了以下两个Topic类:

    • 设备发布消息Topic:/a1uzcH0****/${deviceName}/user/devmsg,权限为发布。
    • 设备订阅消息Topic:/a1uzcH0****/${deviceName}/user/cloudmsg,权限为订阅。
  7. 服务端订阅页签下,单击创建订阅,设置AMQP服务端订阅,订阅设备上报消息默认消费组
    详细操作指导,请参见配置AMQP服务端订阅
  8. 在左侧导航栏,单击设备,然后在刚创建的温度计产品下,添加设备device1,获取设备证书ProductKeyDeviceNameDeviceSecret
    详细操作指导,请参见单个创建设备

设备发送消息给服务器

流程图:

自定义Topic通信

在整个流程中:

  • 服务器通过AMQP客户端接收消息,需配置AMQP客户端接入物联网平台,监听设备消息。具体操作,请参见Java SDK接入示例
    注意 AMQP订阅消息的消费组,必须与设备在相同的物联网平台实例下。
  • 配置设备端SDK接入物联网平台,并发送消息。
    • 配置设备认证信息。
      final String productKey = "a1uzcH0****";
      final String deviceName = "device1";
      final String deviceSecret = "uwMTmVAMnxB***";
      final String region = "cn-shanghai";
    • 设置初始化连接参数,包括MQTT连接配置、设备信息和初始物模型属性。
      LinkKitInitParams params = new LinkKitInitParams();
      //LinkKit底层是MQTT协议,设置MQTT的配置。
      IoTMqttClientConfig config = new IoTMqttClientConfig();
      config.productKey = productKey;
      config.deviceName = deviceName;
      config.deviceSecret = deviceSecret;
      config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
      //设备的信息。
      DeviceInfo deviceInfo = new DeviceInfo();
      deviceInfo.productKey = productKey;
      deviceInfo.deviceName = deviceName;
      deviceInfo.deviceSecret = deviceSecret;
      //报备的设备初始状态。
      Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>();
      
      params.mqttClientConfig = config;
      params.deviceInfo = deviceInfo;
      params.propertyValues = propertyValues;
    • 初始化连接。
      //连接并设置连接成功以后的回调函数。
      LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
           @Override
           public void onError(AError aError) {
               System.out.println("Init error:" + aError);
           }
      
           //初始化成功以后的回调。
           @Override
           public void onInitDone(InitResult initResult) {
               System.out.println("Init done:" + initResult);
           }
       });
    • 设备发送消息。

      设备端连接物联网平台后,向以上定义的Topic发送消息。需将onInitDone函数内容替换为以下内容:

      @Override
       public void onInitDone(InitResult initResult) {
           //设置Pub消息的Topic和内容。
           MqttPublishRequest request = new MqttPublishRequest();
           request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg";
           request.qos = 0;
           request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}";
           //发送消息并设置成功以后的回调。
           LinkKit.getInstance().publish(request, new IConnectSendListener() {
               @Override
               public void onResponse(ARequest aRequest, AResponse aResponse) {
                   System.out.println("onResponse:" + aResponse.getData());
               }
      
               @Override
               public void onFailure(ARequest aRequest, AError aError) {
                   System.out.println("onFailure:" + aError.getCode() + aError.getMsg());
               }
           });
       }

      服务器收到消息如下:

      Message
      {payload={"temperature":35.0, "time":"sometime"},
      topic='/a1uzcH0****/device1/user/devmsg',
      messageId='1131755639450642944',
      qos=0,
      generateTime=1558666546105}

    实际业务场景中,您需修改以下参数值。

    参数 示例 说明
    productKey a1uzcH0****

    设备认证信息,请参见创建产品和设备中的步骤8

    deviceName device1
    deviceSecret uwMTmVAMnxB***
    region cn-shanghai 您的物联网平台设备所在地域代码。地域代码表达方法,请参见地域和可用区
    channelHost iot-o***.mqtt.iothub.aliyuncs.com:1883 设备接入地址。详细说明,请参见查看实例终端节点
    注意 接入域名若没有携带端口号1883,需添加该端口号。
    request.topic "/" + productKey + "/" + deviceName + "/user/devmsg" 具有发布权限的自定义Topic。
    request.payloadObj "{\"temperature\":35.0, \"time\":\"sometime\"}" 自定义的消息内容。

服务器发送消息给设备

流程图:

自定义Topic通信
  • 配置设备端SDK订阅Topic。

    配置设备认证信息、设置初始化连接参数、初始化连接,请参见设备发送消息给服务器中的相应示例代码。

    设备要接收服务器发送的消息,还需订阅消息Topic。

    配置设备端订阅Topic示例如下:

    //初始化成功以后的回调。
    @Override
    public void onInitDone(InitResult initResult) {
        //设置订阅的Topic。
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg";
        request.isSubscribe = true;
        //发出订阅请求并设置订阅成功或者失败的回调函数。
        LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
                System.out.println("");
            }
    
            @Override
            public void onFailure(AError aError) {
    
            }
        });
    
        //设置订阅的下行消息到来时的回调函数。
        IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
            //此处定义收到下行消息以后的回调函数。
            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                System.out.println(
                    "received message from " + topic + ":" + new String((byte[])aMessage.getData()));
            }
    
            @Override
            public boolean shouldHandle(String s, String s1) {
                return false;
            }
    
            @Override
            public void onConnectStateChange(String s, ConnectState connectState) {
    
            }
        };
        LinkKit.getInstance().registerOnNotifyListener(notifyListener);
    }

    其中,request.topic值需修改为具有订阅权限的自定义Topic。

  • 配置云端SDK调用物联网平台接口Pub发布消息。参数说明,请参见Pub,使用说明,请参见Java SDK使用说明
    • 设置身份认证信息。
       String regionId = "XXXXXX";
       String accessKey = "XXXXXX";
       String accessSecret = "XXXXXXXXX";
       final String productKey = "XXXXXX";
    • 设置连接参数。
      //设置client的参数。
      DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);
      IAcsClient client = new DefaultAcsClient(profile);
    • 设置消息发布参数。
      PubRequest request = new PubRequest();
      request.setQos(0);
      //设置发布消息的Topic。
      request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg");
      request.setProductKey(productKey);
      //设置消息的内容,一定要用Base64编码,否则乱码。
      request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));
    • 发送消息。
      try {
           PubResponse response = client.getAcsResponse(request);
           System.out.println("pub success?:" + response.getSuccess());
       } catch (Exception e) {
           System.out.println(e);
       }
      设备端接收到的消息如下:
      msg = [{"accuracy":0.001,"time":now}]

附录:代码示例

说明 实际业务场景中,请按照上文描述,修改代码中相关参数的值。

下载Pub/Sub demo,包含本示例的云端SDK和设备端SDK配置代码Demo。

AMQP客户端接入物联网平台示例,请参见: