本文通过一个模拟场景介绍Kafka连接器的简单使用。场景为消费者下发订餐信息(消息队列Kafka的Topic发送消息),订餐信息持久化存储到指定数据库。

前提条件

您需要提前完成以下准备工作:

背景信息

本文介绍的是实现订阅订餐信息,订餐信息持久化处理的配置流程。

Kafka的主题TopicA发送消息模拟消费者下发订餐信息,然后订餐信息存储到数据库的orders表。

订餐信息内容假设为消费者ID(customer_id)、商家ID(restaurant_id)和食品(food)的JSON语句,示例如下:
{
  "customer_id": 200,
  "restaurant_id": 800,
  "food": "apple"
}

orders表中也设置了customer_id、restaurant_id和food字段,以便和订餐信息一一对应。

创建的集成主要完成以下动作:
  1. 订阅订餐信息(即TopicA发送的消息)。
  2. 订餐信息持久化存储(即保存消息到数据库表)。

视频教程

创建连接

本示例中会用到Kafka和Database连接,所以需要借助连接器创建对应的连接。

创建Kafka和Database连接。
创建连接的具体操作,请参见创建连接

创建空白集成

  1. 登录应用集成控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,选择集成 > 集成列表
  4. 集成列表页面,选择目标命名空间,然后单击新建集成
  5. 新建集成面板,选择创建方式为空白流,选择目标环境,输入集成名称,然后单击创建
  6. 集成创建后,进入集成设计页面,在页面右上角单击保存

创建集成流

  1. 集成设计页面左上角,单击图标,在列表中单击Flow,创建集成流。
    也可以在页面中,选择点击创建 > Flow,创建集成流。
  2. 选择触发器,订阅订餐信息。
    1. 创建新集成流对话框输入集成流名称,并选择之前创建的Kafka连接作为触发器,然后单击创建
      触发器-Kafka连接
    2. 选择操作对话框中Subscribe右侧,单击选择
      选择操作-Subscribe
    3. 步骤配置对话框中设置订阅的目标Topic,然后单击确定
      步骤配置-目标TopicA
    4. 设置outputDataShape对话框选择schema列表中选择Json Instance,然后在设置outputDataShape对话框输入Json Instance格式的订餐信息,单击创建
       设置outputDataShape-Json Instance订餐信息
      参数 描述
      选择schema 订餐信息的转换格式,本示例中以Json Instance格式为例。
      schema Json Instance格式订餐信息示例。本例的订餐信息如下:
      {
        "customer_id": 200,
        "restaurant_id": 800,
        "food": "apple"
      }
      名称 自定义设置名称。
      描述 订餐信息描述。
    创建完成后,集成流中即包含订阅订餐信息的触发器。从kafak订阅消息
  3. 在集成流中添加Database连接,实现订餐信息的持久化存储。
    1. 在集成流中Subscribe右侧单击 图标。
    2. 选择组件类型对话框单击连接,然后单击之前新建的Database连接。
    3. 选择操作对话框中Invoke SQL右侧,单击选择
      选择操作-Invoke SQL
    4. 步骤配置对话框设置参数,然后单击确定
      步骤配置-SQL

      本示例输入的SQL语句为insert into orders(customer_id, restaurant_id, food) values(:#customer_id, :#restaurant_id, :#food),即在数据库表orders中写入订餐信息。

    创建完成后,集成流中即包含触发器和Database连接。触发器-Database
  4. 在集成流中添加数据映射器,实现将订餐信息字段赋值给数据库表的对应字段。
    1. 鼠标悬停在集成流中Subscribe右侧的上,然后单击 图标。
    2. 选择组件类型对话框单击逻辑步骤,然后单击data-mapper(数据映射器)
    3. Source > 1-apple-testTarget > 2-SQL Parameter之间单击相同字段进行映射,然后单击确认
      字段映射
    4. 设置对话框选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即包含触发器、数据映射器和Database连接。触发器-Data Mapper-Datebase
  5. 集成设计页面右上角单击保存,保存集成流。
    实现订餐信息持久化存储的集成流创建完成。
    注意 返回集成设计页面时,请及时在页面右上角单击保存,以免添加的步骤丢失。

部署集成

集成创建并保存后,需要对集成进行部署。具体操作,请参见部署集成示例

结果验证

  1. 登录Kafka服务器,在TopicA发送一条订餐信息。
    {
      "customer_id": 200,
      "restaurant_id": 800,
      "food": "apple"
    }
  2. 查询集成执行记录。
    1. 登录应用集成控制台
    2. 在顶部菜单栏,选择地域。
    3. 在左侧导航栏选择集成 > 执行管理
    4. 执行管理页面通过工作空间/集成部署环境/日期运行时长筛选目标集群的执行记录。
      集成的执行记录状态为SUCCESS即代表集成正常运行。执行记录
    5. 在目标集成的执行记录右侧单击查看
      在执行日志对话框中可以查看集成流每个步骤的状态和相关输出。集成执行日志
  3. 在数据库表查询orders表内容是否新增一条订餐信息。
    数据库新增订餐信息