本文通过一个模拟场景介绍RocketMQ连接器的简单使用。场景为消费者下发订餐信息(消息队列RocketMQ的Topic发送消息),订餐信息持久化存储到指定数据库,然后通知商家(消息队列RocketMQ的Topic)接单。
背景信息
本文介绍的是实现订阅订餐信息,订餐信息持久化处理,然后发布订单信息到商家以提醒商家接单的配置流程。
RocketMQ的主题TopicA发送信息模拟消费者下发订餐信息,然后订餐信息存储到数据库的orders表,最后再将信息发送到商家(RocketMQ的主题TopicB)。
订餐信息内容假设为消费者ID(customer_id)、商家ID(restaurant_id)和食品(food)的JSON语句,示例如下:
{
"customer_id": 200,
"restaurant_id": 800,
"food": "apple"
}
orders表中也设置了customer_id、restaurant_id和food字段,以便和订餐信息一一对应。
本文创建的集成将实现以下功能:
- 订阅订餐信息(即TopicA发送的消息)。
- 订餐信息持久化存储(即保存消息到数据库表)。
- 发送订餐信息到商家(即发送到TopicB),提醒商家接单。
创建连接
本示例中会用到RocketMQ和Database连接,所以需要借助连接器创建对应的连接。
创建空白集成
- 登录应用集成控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,选择。
- 在集成列表页面,选择目标命名空间,然后单击新建集成。
- 在新建集成面板,选择创建方式为空白流,选择目标环境,输入集成名称,然后单击创建。
- 集成创建后,进入集成设计页面,在页面右上角单击保存。
创建集成流
- 在集成设计页面左上角,单击图标,在列表中单击Flow,创建集成流。
也可以在页面中,选择,创建集成流。
- 选择触发器,订阅订餐信息。
- 在创建新集成流对话框,输入集成流名称,选择RocketMQ连接作为触发器,然后单击创建。
- 在选择操作对话框中,单击订阅消息右侧的选择。
- 在步骤配置对话框中,设置参数,然后单击确定。
参数 |
描述 |
Topic |
订阅消息的目标主题,本文以TopicA为例。 |
consumerGroup |
消费者的Group ID,本文以GID_GroupB为例。 |
subExpression |
消息过滤表达式,满足表达式的消息才会被订阅。
说明
仅支持或运算,如tag1 || tag2 || tag3,如果设置为空和星号(*),则表示全部订阅。
本示例中如果TopicA发送的消息不带TagA,则不会触发集成流。
|
messageModel |
消息模型,保持默认值。 |
- 在设置outputDataShape对话框,在选择schema列表中选择Json Schema,然后在设置outputDataShape对话框输入Json Schema格式的订餐信息相关参数,单击创建。
参数 |
描述 |
选择schema |
订餐信息的转换格式,本示例中以Json Schema格式为例。
|
schema |
Json Schema格式订餐信息示例。
|
名称 |
自定义设置名称。 |
描述 |
订餐信息描述。 |
本示例中的
Json Schema格式订餐信息详情如下:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"customer_id": {
"type": "integer"
},
"restaurant_id": {
"type": "integer"
},
"food": {
"type": "string"
}
},
"required": [
"customer_id",
"restaurant_id",
"food"
]
}
创建完成后,集成流中即包含订阅订餐信息的触发器。
- 在集成流中添加条件转换器,实现将订餐信息的Body通过Header透传。
集成流每个step运行之后,订餐信息的Body都会变化,此处通过Header透传解决此问题。
- 在集成流中订阅信息右侧单击 图标。
- 在选择组件类型对话框,单击逻辑步骤,然后单击transform(转换器)。
- 在groovy 脚本对话框,输入条件转换语句,单击确定。
本示例中的条件转换语句为
msg.setHeader("orders", msg.getBody())
,即将订阅的订餐信息的Body通过Header透传。
- 在设置对话框,在选择schema列表中选择任意类型,然后单击创建。
创建完成后,集成流中即包含触发器和条件转换器。
- 在集成流中添加Database连接,实现订餐信息的持久化存储。
- 在集成流中transform(转换器)右侧,单击图标。
- 在选择组件类型对话框,单击连接,然后单击Database连接。
- 在选择操作对话框,单击Invoke SQL右侧的选择。
- 在步骤配置对话框,设置参数,然后单击确定。
本示例输入的SQL语句为insert into orders(customer_id, restaurant_id, food) values(:#customer_id, :#restaurant_id,
:#food)
,即在数据库表orders中写入订餐信息。
- 在设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建。
创建完成后,集成流中即包含触发器、条件转换器和Database连接。
- 在集成流中添加数据映射器,实现将订餐信息字段赋值给数据库表的对应字段。
- 鼠标悬停在集成流中transform(转换器)右侧的上,然后单击图标。
- 在选择组件类型对话框,单击逻辑步骤,然后单击data-mapper(数据映射器)。
- 在与之间单击相同字段进行映射,然后单击确认。
- 在设置对话框选择schema列表中选择任意类型,然后单击创建。
创建完成后,集成流中即包含触发器、条件转换器、数据映射器和Database连接。
- 在集成流中添加条件转换器,实现将订餐信息的Header重新赋值给Body。
- 在集成流中Invoke SQL右侧单击 图标。
- 在选择组件类型对话框,单击逻辑步骤,然后单击transform(转换器)。
- 在groovy 脚本对话框,输入条件转换语句,单击确定。
本示例中的条件转换语句为
payload = msg.getHeader("orders")
,即将订餐信息的Header重新赋值给Body。
- 在设置对话框,在选择schema列表中选择任意类型,然后单击创建。
创建完成后,集成流中即包含触发器、条件转换器、数据映射器、Database连接和条件转换器。
- 在集成流中添加发送订餐信息到商家的步骤。
- 鼠标悬停在集成流中transform(转换器)右侧的上,然后单击图标。
- 在选择组件类型对话框,单击连接,然后单击之前新建的RocketMQ连接。
- 在选择操作对话框,单击发送消息右侧的选择。
- 在步骤配置对话框,设置参数,然后单击确定。
参数 |
描述 |
Topic |
发送消息到目标主题,本文以TopicB为例。 |
发送组(ProducerGroup) |
发送组,起标识作用,可不填。 |
sendTag |
发送消息的标签,用于对消息的再归类。 |
sendKey |
发送消息的业务标识。 |
sendMsgTimeoutMillis |
延时时长,单位毫秒,此处以3000为例。 |
- 在设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建。
- 在集成设计页面右上角,单击保存,保存集成流。
实现订餐信息持久化存储和提醒商家接单的集成流创建完成。
注意 返回集成设计页面时,请及时在页面右上角单击保存,以免添加的步骤丢失。
部署集成
集成创建并保存后,需要对集成进行部署。具体操作,请参见部署集成示例。
结果验证
- 消费者发送信息。
- 登录消息队列RocketMQ版控制台。
- 在实例列表页面,找到目标实例,在其操作列,单击详情。
- 在TopicA内发送消息。
- 在左侧导航栏,单击Topic管理。
- 在Topic管理页面,选择TopicA,并单击右侧的发送消息。
- 在发送消息对话框,设置消息参数,单击确定。
参数 |
描述 |
Topic |
已选定发送消息的Topic,不可编辑。 |
Tag |
发送消息携带的标签。 |
Key |
发送消息携带的业务标识。 |
消息体 |
发送的消息主体,本示例为:{
"customer_id": 200,
"restaurant_id": 800,
"food": "apple"
}
|
- 查询集成执行记录。
- 登录应用集成控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,选择。
- 在执行管理页面,设置工作空间/集成、部署环境/日期和运行时长,筛选目标集群的执行记录。
集成的执行记录状态为
SUCCESS即代表集成正常运行。
- 在目标集成的执行记录右侧,单击查看。
在执行日志对话框中可以查看集成流每个步骤的状态和相关输出。
- 在数据库表查询orders表内容是否新增一条订餐信息。
- 商家查询订餐消息。
- 登录消息队列RocketMQ版控制台。
- 在实例列表页面,找到目标实例,在其操作列,单击详情。
- 在左侧导航栏,单击消息查询。
- 在消息查询页面,单击按Topic查询页签。
如果您需要了解其他查询消息的方式,请参见
消息查询。
- 在按Topic查询页签下,选择TopicB和时间,然后单击搜索。
- 在查询消息结果列表单击目标记录的右上角的下载按钮,然后在本地打开下载的文件,查看详细订餐信息。