本文介绍使用AMQP协议的Go客户端接入阿里云物联网平台,接收服务端订阅消息的示例。

开发环境

本示例的测试环境为Go 1.12.7。

下载SDK

可使用以下命令导入Go语言AMQP SDK。

import "pack.ag/amqp"

SDK使用说明,请参见package amqp

代码示例

以下Demo中涉及的参数说明,请参见AMQP客户端接入说明

package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
//参数说明,请参见AMQP客户端接入说明文档。
const accessKey = "${YourAccessKey}"
const accessSecret = "${YourAccessSecret}"
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
const iotInstanceId = "${YourIotInstanceId}"
//接入域名,请参见AMQP客户端接入说明文档。
const host = "${YourHost}"

func main() {
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    //userName组装方法,请参见AMQP客户端接入说明文档。
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    //如果需要做接受消息通信或者取消操作,从Background衍生context。
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

//业务函数。用户自定义实现,该函数被异步执行,请考虑系统资源消耗情况。
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client       *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil != err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        //阻塞接受消息,如果ctx是background则不会被打断。
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp receive data error:", err)

            //如果是主动取消,则退出程序。
            select {
            case <- childCtx.Done(): return
            default:
            }

            //非主动取消,则重新建立连接。
            err := am.generateReceiverWithRetry(childCtx)
            if nil != err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    //退避重连,从10ms依次x2,直到20s。
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    //异常情况,退避重连。
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil != err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
            times ++
        } else {
            fmt.Println("amqp connect init success")
            return nil
        }
    }
}

//由于包不可见,无法判断Connection和Session状态,重启连接获取。
func (am *AmqpManager) generateReceiver() error {

    if am.session != nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        //如果断网等行为发生,Connection会关闭导致Session建立失败,未关闭连接则建立成功。
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    //清理上一个连接。
    if am.client != nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err != nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err != nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err != nil {
        return err
    }
    am.receiver = receiver

    return nil
}

您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明

参数 示例 说明
accessKey LTAI4GFGQvKuqHJhFa******

登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

说明 如果使用RAM用户,您需授予该用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台
accessSecret iMS8ZhCDdfJbCMeA005sieKe******
consumerGroupId VWhGZ2QnP7kxWpeSSjt****** 消费组ID。

登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表查看您的消费组ID。

iotInstanceId "" 实例ID。仅企业版实例需要传入。公共实例下,传入空值,即iotInstanceId = ""

您可登录物联网平台控制台,找到对应的实例,单击实例,进入实例详情页面查看企业版实例ID。

clientId 12345 表示客户端ID,建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。长度不可超过64个字符。

登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表,单击消费组对应的查看消费组详情页将显示该参数,方便您识别区分不同的客户端。

host 233***.iot-amqp.cn-shanghai.aliyuncs.com AMQP客户端接入物联网平台的接入域名。详细说明,请参见查看实例终端节点

运行结果示例

  • 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。成功
  • 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。

    您可根据日志提示,检查代码或网络环境,然后修正问题,重新运行代码。

    失败