本文介绍通过应用集成实现订单分发。完成从SFTP文件服务器获取新的订单文件,按供应商的不同自动生成不同的分类订单文件,并将分类订单文件发送到指定的接收端点(SFTP服务器的outcoming路径下),另将原订单文件上传到OSS和SMB指定路径下实现共享和备份。

前提条件

集成示例场景简介

有一个名为data.tsv订单文件,包含一些订单数据,并已经上传到SFTP服务器的incoming目录下。文件内容格式如下:

id,orderid,code,qty,price,supplier
1,SPA065037,R930072,500,93.68,SPA
2,SPA065038,R035516,100,258.71,SPA
3,SPB019402,R204189,270,51.22,SPB
4,SPB019403,R204190,280,55.33,SPB

本文创建的集成将实现以下功能:

  1. 从SFTP服务器下载data.tsv文件。
  2. 使用groovy脚本,将文件数据按supplier(SPA和SPB)拆分并生成两个订单文件spa.tsvspb.tsv
  3. spa.tsvspb.tsv分别上传到SFTP服务器的outcoming目录中。
  4. data.tsv上传到OSS的存储空间中,进行备份。
  5. data.tsv上传到SMB共享服务器的根目录中。

创建连接

本示例中会用到SFTP、SMB和OSS,所以需要借助连接器创建对应的连接。

创建空白集成

在SFTP、SMB和OSS连接创建完成后,即可创建处理自定义订单的集成。

  1. 登录应用集成控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,选择集成 > 集成列表
  4. 集成列表页面,选择目标工作空间,然后单击新建集成
  5. 新建集成面板,选择创建方式为空白流,选择目标环境,输入集成名称,然后单击创建
  6. 集成创建后,进入集成设计页面,选择接下来的操作。
    • 在右上角单击保存,创建一个空集成。
    • 在页面左上角单击图标,在列表中单击Flow,创建集成流。

创建集成流

  1. 集成设计页面左上角,单击图标,在列表中单击Flow,创建集成流。
    也可以在页面中,选择点击创建 > Flow,创建集成流。
  2. 创建触发器。
    1. 创建新集成流对话框输入名称,并选择之前创建的SFTP作为触发器,然后单击创建
      创建新集成流-SFTP触发器
    2. 选择操作对话框,单击Download右侧的选择
      由于选择了SFTP作为触发器,所以在选择操作对话框仅有Download一个操作,即从SFTP服务器下载文件。选择operation
    3. 步骤配置对话框,设置SFTP下载参数,然后单击确定
      步骤配置-从SFTP下载文件
      从SFTP服务器下载文件的参数说明如下。
      参数 描述 是否必须
      下一次轮询之前的毫秒 轮询时间间隔。
      是否下载后删除文件 是否下载后从服务器删除文件。
      文件名正则匹配 文件名的匹配规则,需满足正则表达式。
      文件名 需要下载的文件名称,本场景设置为data.tsv
      说明 建议限制文件名。如果不配置,则会递归到全部目录,有可能导致集成执行失败。
      是否递归扫描目录 是否扫描子目录下的文件。
      是否验证幂等性(避免重复消费) 在一定范围内验证文件是否被消费过,消费过的文件不会再次被消费。

      应用集成控制台会建立一个文件绝对路径+文件修改时间的集合,总共可存1000条数据,数据变更符合队列规则(LRU)。

      文件绝对路径文件修改时间都相同,且记录的数据在数据限制范围内,即代表文件已经被消费过,不会再次被消费。

      轮询开始前的毫秒数 开始轮询之前等待的时间。
      SFTP目录 要下载文件的SFTP目录名,本场景设置为incoming
    4. 设置outputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流即包含了从SFTP服务器下载文件的触发器。SFTP-触发器结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。
  3. 创建多播器逻辑步骤。
    1. 在集成流中单击SFTP Download触发器右侧的图标。
    2. 选择组件类型对话框,单击逻辑步骤,然后单击multicast(多播器)
    3. multicast对话框配置参数,然后单击确定
      multicast配置
      参数 描述
      是否并行执行 默认否。如果需要提升性能,可以选择是,如果启用,则同时向多播发送消息。
      说明 调用者线程将继续等待,直到所有消息都已被完全处理。 它只在消息发送和处理回复时同时进行。
      产生异常是否终止 发生异常时是否停止立即继续处理。默认否,即Camel将消息发送到所有多播,无论其中是否发生失败。
    创建成功后,集成流中即包含了多播器的步骤。组播器配置结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。
  4. 创建Transform逻辑步骤。
    1. 鼠标悬停在集成流中多播器步骤Start右侧的上,然后单击图标。
    2. 组件类型选择对话框单击逻辑步骤,然后单击转换器
    3. groovy脚本对话框输入以下groovy脚本,单击确定
      class OrderA {
          String id;
          String orderId;
          String code;
          String qty;
          String price;
          String supplier;
          OrderA(String id, String orderId, String code, String qty, String price, String supplier) {
              this.id       = id;
              this.orderId  = orderId;
              this.code     = code;
              this.qty      = qty;
              this.price    = price;
              this.supplier = supplier;
          }
      }
      
      String content = convertTo(String.class)
      def lst = content.split("\n")
      def orderList = new ArrayList<OrderA>()
      for (int i = 0; i < lst.size(); i++) {
          String[] values = lst[i].split(",")
          def order = values as OrderA
          orderList.add(order)
      }
      
      def map = [:]
      map.put("SPA", orderList.findAll{it.supplier == "SPA"})
      map.put("SPB", orderList.findAll{it.supplier == "SPB"})
      
      lst = map.get("SPA");
      content = "";
      for (int i = 0; i < lst.size(); i++) {
          String line = String.format("%s\t%s\t%s\t%s\n", lst[i].id, lst[i].orderId, lst[i].code, lst[i].qty)
          content += line;
      }
      
      payload = content
    4. 设置对话框,在选择schema列表中选择任意类型,然后单击创建
    5. 设置outputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建成功后,在集成流中即包含了转换器步骤。Transform 配置结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。
  5. 添加SFTP连接,上传文件。
    1. 鼠标悬停在集成流中transform(转换器)后的上,然后单击图标。
    2. 选择组件类型对话框,单击连接,然后单击SFTP连接。
    3. 选择操作对话框,单击Upload右侧的选择
      由于选择了Transform配置,所以在选择操作对话框仅有Upload一个操作,即将文件上传到SFTP服务器。SFTP-upload
    4. 步骤配置对话框设置参数,然后单击确定
      SFTP-upload-参数配置
      文件上传到SFTP服务器的参数说明。
      参数 描述 是否必须
      文件名表达式 解析为文件名的简单语言表达式,本场景设置为spa.tsv
      文件是否存在 当上传的文件已经在服务器上时需要的行为。
      复制时的临时文件前缀 复制时临时文件前缀。
      复制时的临时文件名 复制时的临时文件名。
      SFTP目录 上传文件到的SFTP目录名,本场景设置为outcoming
    5. 设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    添加完成后,集成流中即包含了Upload步骤。SFTP-upload-结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。

为集成流添加上传文件到SFTP的并发流

  1. 编辑集成流,在多播器中增加一个并发流。
    1. 在集成流多播器步骤右上角单击图标,在列表中单击编辑
    2. 选择组件类型对话框,在并发流模板下方单击添加,然后单击确定
      添加并发流
  2. 在新添加的并发流中创建转换器逻辑步骤。
    1. 鼠标悬停在Start后的上,然后单击图标。
    2. 组件类型选择对话框单击逻辑步骤,然后单击转换器
    3. groovy脚本对话框输入以下groovy脚本,单击确定
      class OrderB {
          String id;
          String orderId;
          String code;
          String qty;
          String price;
          String supplier;
          OrderB(String id, String orderId, String code, String qty, String price, String supplier) {
              this.id       = id;
              this.orderId  = orderId;
              this.code     = code;
              this.qty      = qty;
              this.price    = price;
              this.supplier = supplier;
          }
      }
      
      String content = convertTo(String.class)
      def lst = content.split("\n")
      def orderList = new ArrayList<OrderB>()
      for (int i = 1; i < lst.size(); i++) {
          String[] values = lst[i].split(",")
          def order = values as OrderB
          orderList.add(order)
      }
      
      def map = [:]
      map.put("SPA", orderList.findAll{it.supplier == "SPA"})
      map.put("SPB", orderList.findAll{it.supplier == "SPB"})
      
      lst = map.get("SPB");
      content = "";
      for (int i = 0; i < lst.size(); i++) {
          String line = String.format("%s\t%s\t%s\t%s\n", lst[i].id, lst[i].orderId, lst[i].code, lst[i].qty)
          content += line;
      }
      
      payload = content
    4. 设置对话框,在选择schema列表中选择任意类型,然后单击创建
    5. 设置outputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建成功后,新的并发流中即包含了转换器步骤。添加并发流-创建Transform-结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。
  3. 在添加的并发流中添加SFTP连接,上传文件。
    1. 鼠标悬停在并发流中转换器后的上,然后单击图标。
    2. 选择组件类型对话框,单击连接,然后单击SFTP连接。
    3. 选择操作对话框,单击Upload右侧的选择
      由于选择了Transform配置,所以在选择操作对话框仅有Upload一个操作,即将文件上传到SFTP服务器。SFTP-upload
    4. 步骤配置对话框,设置上传文件相关参数,然后单击确定
      SFTP-选择组件类型
      文件上传到SFTP服务器的参数说明。
      参数 描述 是否必须
      文件名表达式 解析为文件名的简单语言表达式,本场景设置为spb.tsv
      文件是否存在 当上传的文件已经在服务器上时需要的行为。
      复制时的临时文件前缀 复制时临时文件前缀。
      复制时的临时文件名 复制时的临时文件名。
      SFTP目录 上传文件到的SFTP目录名,本场景设置为outcoming
    5. 设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建成功后,添加的并发流中即包含了Upload步骤。添加并发流-结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。

为集成流添加OSS连接

在集成流中添加OSS连接,可以将处理后文件放入OSS的存储空间中。

  1. 编辑集成流,在多播器中增加一个并发流。
    1. 在集成流多播器步骤右上角单击图标,在列表中单击编辑
    2. 选择组件类型对话框,在并发流模板下方单击添加,然后单击确定
      添加并发流
  2. 在新添加的并发流中添加OSS连接。
    1. 鼠标悬停在Start后的上,然后单击图标。
    2. 选择组件类型对话框,单击连接,单击OSS连接。
    3. 选择操作对话框,单击Put File右侧的选择
      选择operation
    4. 步骤配置对话框,设置相对路径为根目录/,然后单击确定
    5. 设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建完成后,并发流中即包含了将文件放入存储空间中的步骤。OSS结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。

为集成流添加SMB连接

  1. 编辑集成流,在多播器中增加一个并发流。
    1. 在集成流多播器步骤右上角单击图标,在列表中单击编辑
    2. 选择组件类型对话框,在并发流模板下方单击添加,然后单击确定
      添加并发流
  2. 在新添加的并发流中添加SMB连接。
    1. 鼠标悬停在Start后的上,然后单击图标。
    2. 选择组件类型对话框,单击连接,然后单击SMB连接。
    3. 选择操作对话框,单击Upload右侧的选择
    4. 步骤配置对话框设置参数,然后单击确定
      SMB 步骤配置-上传文件
      文件上传到SMB服务器的参数说明。
      参数 描述 是否必须
      文件名表达式 解析为文件名的简单语言表达式,本场景设置为data.tsv
      文件是否存在 服务器上已经有要上传的文件时的所需行为。
      复制时的临时文件名 复制时的临时文件名。
      SMB目录 文件上传到的SMB目录,本场景设置为根目录/
    5. 设置inputDataShape对话框,在选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即包含了将文件上传到SMB服务器的步骤。最终结果
    注意 如果集成比较复杂,当返回集成设计页面时,可以及时在页面右上角单击保存,以免添加的步骤丢失。

部署集成

集成创建并保存后,需要对集成进行部署。具体操作,请参见部署集成示例

结果验证

  1. 登录SFTP服务器,进入outcoming目录,查看是否包含spa.tsvspb.tsv文件。
  2. 打开spa.tsvspb.tsv文件,检查是否已经按supplier SPA和SPB拆分。
  3. 登录OSS,查看是否存在data.tsv文件。
  4. 登录SMB服务器,查看是否存在data.tsv文件。