本文介绍如何通过事件总线EventBridge将消息队列RocketMQ版的数据推送到函数计算。

说明 关于事件总线EventBridge支持的阿里云地域和各地域的接入点,请参见地域和接入点

步骤一:创建自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,选择事件驱动 > 事件源
  3. 在顶部菜单栏,选择地域。
  4. 快速添加自定义事件源区域,单击消息队列RocketMQ版
  5. 添加自定义事件源面板,输入名称,输入描述,选择RocketMQ实例,选择Topic,选择自定义总线,然后单击确定

步骤二:创建事件规则

注意 目标服务和事件规则必须处于同一地域。
  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,选择事件驱动 > 事件规则
  3. 在顶部菜单栏,选择地域。
  4. 事件规则页面,选择自定义总线,然后单击创建规则
  5. 在配置向导页面,完成以下操作。
    1. 配置基本信息页面,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步
    2. 配置事件模式页面,事件源类型选择自定义事件源事件源选择步骤一创建的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标页面,配置事件目标,然后单击创建
      说明 1个事件规则最多可以添加5个目标。
      • 服务类型:单击函数计算
      • 服务:选择已创建的服务。
      • 函数:选择已创建的函数。
      • 事件:单击模板

        以下提供变量和模板的示例。

        变量示例:

        {
          "source":"$.source",
          "type":"$.type"
        }

        模板示例:

        The event comes from ${source},event type is ${type}.

        如需了解更多信息,请参见事件内容转换

      • 服务版本和别名:选择服务版本或服务别名。
        • 默认版本:LATEST。
        • 指定版本:选择服务版本。更多信息,请参见版本简介
        • 指定别名:选择服务别名。更多信息,请参见别名简介

步骤三:发布事件

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            //设置发送超时时间,单位毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");

            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            producer.start();

            Message msg = new Message(
                    // Message所属的Topic。
                    "TopicTestMQ",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());

            // 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey("ORDERID_100");

            // 异步发送消息,发送结果通过callback返回给客户端。
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // 消息发送成功。
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // 在callback返回之前即可取得msgId。
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置账号,从控制台获取设置。
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // 您在控制台创建的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // 您在控制台创建的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // 创建生产者实例。
        // 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // 启动客户端实例。
        producer.start();

        // 创建消息对象。
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // 在您的线程即将退出时,关闭生产者实例。
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    //创建Producer,并配置发送消息所必需的信息。
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//您在控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// 在控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//AccessKeyId阿里云身份验证,在阿里云服务器管理控制台创建。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );//AccessKeySecret阿里云身份验证,在阿里云服务器管理控制台创建。


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
            "TagA",
            //Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
            factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务关键属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");

    //发送消息,只要不抛出异常,就代表发送成功。
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        //自定义处理exception的细节。
    }
    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击服务及函数
  4. 服务及函数页面的服务列表中,单击目标服务。
  5. 函数列表页签,找到目标函数,然后单击函数名称列下的目标函数名称。
  6. 单击日志查询页签,查看日志。
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****