本文介绍通过应用集成实现从MongoDB获取数据写入Apache Kudu的数据集成转换。完成从MongoDB指定集合获取数据,然后写入Apache Kudu目标表,过程中如果两者数据结构存在差异可以加入数据映射逻辑步骤。

前提条件

  • 在阿里云容器服务Kubernetes版上创建了MongoDB和Apache Kudu服务。操作步骤,请参见使用镜像快速创建无状态Deployment应用
  • 已经在MongoDB上创建game集合,并在game集合中存入数据。
  • 已经在Apache Kudu上创建test表。

背景信息

通过集成流配置可以实现从MongonDB指定集合中获取数据,并将数据写入到Apache Kudu的指定表中。

通过创建的集成,将完成以下动作:
  1. 从MongonDB的game集合获取数据。
  2. 将获取到的数据写入到Apache Kudu的test表中。

创建连接

本示例中会用到MongoDB和Apache Kudu,所以需要借助连接器创建对应的连接。

创建空白集成

  1. 登录应用集成控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,选择集成 > 集成列表
  4. 集成列表页面,选择目标工作空间,然后单击新建集成
  5. 新建集成面板,选择创建方式为空白流,选择目标环境,输入集成名称,然后单击创建
  6. 集成创建后,进入集成设计页面,在右上角单击保存

创建集成流

  1. 集成设计页面左上角,单击图标,在列表中单击Flow,创建集成流。
    也可以在页面中,选择点击创建 > Flow,创建集成流。
  2. 选择触发器,实现从MongoDB的game集合中获取数据。
    1. 创建新集成流对话框,输入名称,并选择MongoDB作为触发器,然后单击创建
      MongoDB触发器
    2. 选择操作对话框,单击Mongo consumer右侧的选择
      选择operation-Mongo consumer
    3. 步骤配置对话框,设置Mongo DB消费者参数,然后单击确定
      步骤配置-从MaogoDB读取数据
      Mongo DB消费者参数说明。
      参数 描述
      集合名称 获取数据的集合名称,本场景设置为game
      DB用于存储尾部跟踪 用于存储该跟踪过程的偏移量。
      唯一的id来识别这个跟踪过程 如果在集成中有多个跟踪过程,请指定一个不同的id。
      用于存储尾部跟踪的字段 属性,用于存储此尾部跟踪过程的偏移量。
      用于存储尾部跟踪的集合 字段,用于存储此尾部跟踪过程的偏移量。
      持续跟踪是否启用 是否持续跟踪。
      用于跟踪传入文档的集合字段 用于跟踪新文档的属性,通常是增量ID或时间戳,本场景设置为name
    4. 设置outputDataShape对话框,设置参数,然后单击创建
      设置outputDataShape
      设置outputDataShape参数说明。
      参数 描述
      选择schema 选择Json Schema
      schema 如果需要,可以输入JSON脚本。
      名称 schema名称。
      描述 schema的描述。
      本场景的示例JSON脚本如下。
      {
          "type":"array",
          "$schema":"http://json-schema.org/schema#",
          "items":{
              "type":"object",
              "properties":{
                  "code":{
                      "type":"string",
                      "required":true
                  },
                  "name":{
                      "type":"string",
                      "required":true
                  },
                  "year":{
                      "type":"string",
                      "required":true
                  },
                  "language":{
                      "type":"string",
                      "required":false
                  },
                  "genre":{
                      "type":"string",
                      "required":true
                  }
              }
          }
      }
    创建完成后,集成流中即出现了从MongoDB指定集合获取信息的触发器。Mongo consumer-触发器
  3. 在集成流中添加Apache Kudu连接,将获取到的数据插入到Apache Kudu的test表中。
    1. 在集成流中Mongo consumer后单击图标。
    2. 选择组件类型对话框,单击连接,然后单击Apache Kudu连接。
    3. 选择操作对话框,单击Insert a row in a kudu table右侧的选择
       选择operation-Insert a row in a kudu table
    4. 步骤配置对话框,设置Apache Kudu的指定表,然后单击确定
      步骤配置-指定表
    5. 设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即出现了数据上传到Apache Kudu指定表的连接。Mongo consumer-Insert a row in a kudu table
  4. 在集成流中添加数据映射器,完成字段映射。
    1. 鼠标悬停在集成流中Mongo consumer右侧的上,然后单击图标。
    2. 选择组件类型对话框,单击逻辑步骤,然后单击data-mapper(数据映射器)
    3. Source > 1-Mongo > Document RootTarget > 2-Kudu table之间单击字段进行映射,然后单击确认
      字段映射
      字段映关系如下。
      Source字段 Target字段
      name name
      year year
      code grade

      本场景以code字段和grade字段映射为例,您可根据实际需要选择字段映射关系。

      genre
      language
    4. 设置对话框,在选择schema列表中选择任意类型,然后单击创建
  5. 集成设计页面右上角,单击保存,保存集成流。
    MongonDB处理的集成流创建完成。MongoDB处理
    说明 返回集成设计页面时,请及时在页面右上角单击保存,以免添加的步骤丢失。

部署集成

集成创建并保存后,需要对集成进行部署。具体操作,请参见部署集成示例

结果验证

  • 查看MongoDB集合数据。
    • 登录Apache Kudu,在test表中查看是否含有MongoDB的game集合的数据。
    • 您也可以在MongoDB的game集合新增一条数据,然后查看Apache Kudu的test表是否有新增的数据。
  • 验证集成部署是否正常运行。

    查看目标集成的执行记录,其执行日志状态是否为SUCCESS