本文介绍使用.NET语言的AMQP SDK接入阿里云物联网平台,接收服务端订阅消息的示例。

开发环境

本示例使用的开发环境要求如下表。

Framework 支持版本
.Net Framework 3.5、4.0、4.5及以上版本
.NET Micro Framework 4.2及以上版本
.NET nanoFramework 1.0及以上版本
.NET Compact Framework 3.9及以上版本
.Net Core on Windows 10 and Ubuntu 14.04 1.0及以上版本
Mono 4.2.1及以上版本

下载SDK

.NET版本AMQP SDK,推荐使用AMQP.Net Lite库。请访问AMQP.Net Lite下载库和查看使用说明。

添加依赖

packages.config中添加以下依赖。

<packages>
  <package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>

代码示例

using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;

namespace amqp
{
    class MainClass
    {
        //接入域名,请参见AMQP客户端接入说明文档。
        static string Host = "${YourHost}";
        static int Port = 5671;
        static string AccessKey = "${YourAccessKey}";
        static string AccessSecret = "${YourAccessSecret}";
        static string consumerGroupId = "${YourConsumerGroupId}";
        static string clientId = "${YourClientId}";
        //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        static string iotInstanceId = "${YourIotInstanceId}"; 
        static int Count = 0;
        static int IntervalTime = 10000;

        static Address address;

        public static void Main(string[] args)
        {
            long timestamp = GetCurrentMilliseconds();
            string param = "authId=" + AccessKey + "&timestamp=" + timestamp;
            //userName组装方法,请参见AMQP客户端接入说明文档。
            string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
               + ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            string password = doSign(param, AccessSecret, "HmacMD5");

            DoConnectAmqp(userName, password);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            resetEvent.WaitOne();
        }

        static void DoConnectAmqp(string userName, string password)
        {
            address = new Address(Host, Port, userName, password);
            //创建Connection。
            ConnectionFactory cf = new ConnectionFactory();
            //如果需要,使用本地TLS。
            //cf.SSL.ClientCertificates.Add(GetCert());
            //cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
            cf.SASL.Profile = SaslProfile.External;
            cf.AMQP.IdleTimeout = 120000;
            //cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
            cf.AMQP.ContainerId = "client.1.2"; 
            cf.AMQP.HostName = "contoso.com";
            cf.AMQP.MaxFrameSize = 8 * 1024;
            var connection = cf.CreateAsync(address).Result;

            //Connection Exception已关闭。
            connection.AddClosedCallback(ConnClosed);

            //接收消息。
            DoReceive(connection);
        }

        static void DoReceive(Connection connection)
        {
            //创建Session。
            var session = new Session(connection);

            //创建Receiver Link并接收消息。
            var receiver = new ReceiverLink(session, "queueName", null);


            receiver.Start(20, (link, message) =>
            {
                object messageId = message.ApplicationProperties["messageId"];
                object topic = message.ApplicationProperties["topic"];
                string body = Encoding.UTF8.GetString((Byte[])message.Body);
                //注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
                Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);

                //ACK消息。
                link.Accept(message);
            });


        }

        //连接发生异常后,进入重连模式。
        //这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
        static void ConnClosed(IAmqpObject _, Error e)
        {
            Console.WriteLine("ocurr error: " + e);
            if(Count < 3)
            {
                Count += 1;
                Thread.Sleep(IntervalTime * Count);
            }
            else
            {
                Thread.Sleep(120000);
            }

            //重连。
            DoConnectAmqp(address.User, address.Password);
        }

        static X509Certificate GetCert()
        {
            string certPath = Environment.CurrentDirectory + "/root.crt";
            X509Certificate crt = new X509Certificate(certPath);

            return crt;
        }

        static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            return true;
        }

        static long GetCurrentMilliseconds()
        {
            DateTime dt1970 = new DateTime(1970, 1, 1);
            DateTime current = DateTime.Now;
            return (long)(current - dt1970).TotalMilliseconds;
        }

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        static string doSign(string param, string accessSecret, string signMethod)
        {
            //signMethod = HmacMD5
            byte[] key = Encoding.UTF8.GetBytes(accessSecret);
            byte[] signContent = Encoding.UTF8.GetBytes(param);
            var hmac = new HMACMD5(key);
            byte[] hashBytes = hmac.ComputeHash(signContent);
            return Convert.ToBase64String(hashBytes);
        }
    }
}

您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明

参数 示例 说明
Host 233***.iot-amqp.cn-shanghai.aliyuncs.com AMQP客户端接入物联网平台的接入域名。详细说明,请参见查看实例终端节点
AccessKey LTAI4GFGQvKuqHJhFa******

登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

说明 如果使用RAM用户,您需授予该用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台
AccessSecret iMS8ZhCDdfJbCMeA005sieKe******
consumerGroupId VWhGZ2QnP7kxWpeSSjt****** 消费组ID。

登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表查看您的消费组ID。

iotInstanceId "" 实例ID。仅企业版实例需要传入。公共实例下,传入空值,即iotInstanceId = ""

您可登录物联网平台控制台,找到对应的实例,单击实例,进入实例详情页面查看企业版实例ID。

clientId 12345 表示客户端ID,建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。长度不可超过64个字符。

登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表,单击消费组对应的查看消费组详情页将显示该参数,方便您识别区分不同的客户端。

运行结果示例

  • 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。成功
    参数 示例 说明
    topic /***********/******/thing/event/property/post 设备属性上报的Topic
    messageId 1324198300680719360 消息的ID
    body {"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}} 消息的内容
  • 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。失败