文档

Kafka实时ETL同步至Hologres

更新时间:

实时ETL同步方案根据来源Kafka指定Topic的内容结构对目的Hologres表结构做初始化,然后将Kafka指定Topic的存量数据同步至Hologres,同时也持续将增量数据实时同步至Hologres。本文为您介绍如何创建Kafka实时ETL同步至Hologres任务。

使用限制

添加数据源

新建Kafka数据源

您可以手动添加Kafka数据源至DataWorks,详情请参见:Kafka数据源

新建Hologres数据源

  • 获取Hologres数据源信息

    进入Hologres产品控制台。找到您要进行数据同步的Hologres数据源,在实例详情界面获取到Hologres的实例ID、地域信息、数据源地址。如果Hologres开通了指定VPC的网络链接,则可以获取到VPC ID、Vswitch ID。

  • 手动添加Hologres数据源

    可通过绑定Hologres计算引擎自动添加Hologres数据源,或者手动添加Hologres数据源

准备独享数据集成资源组并与数据源网络连通

在进行数据同步前,需要完成您的独享数据集成资源组和数据源的网络连通,详情请参见配置网络连通

说明

Kafka与Hologres支持的网络类型如下:

  • Kafka: 指定VPC网络、公网。

  • Hologres: 指定VPC网络、通用VPC网络(AnyTunnel)、公网。

  • 如果您的独享数据集成资源组和数据源属于同一地域,可使用同地域VPC内网(AnyTunnel或者SingleTunnel)连通独享资源组和数据源。如果是SingleTunnel网络类型实现网络连通需要执行:

    1. 在DataWorks侧新增专有网络绑定和自定义路由。

    2. 在数据源侧添加数据库白名单。

  • 如果您的独享数据集成资源组和数据源属于不同地域,可使用公网连通独享资源组和数据源。实现网络连通需要执行:在数据源侧添加数据库白名单。

步骤1 新增专有网络绑定和自定义路由

说明

如果您使用公网连通独享资源组和数据源,可跳过此步骤。

  1. 新增专有网络绑定。

    1. 进入DataWorks管控台资源组列表页面,找到您要连通的独享数据集成资源组,单击资源组右侧的网络设置,进入资源组网络设置页面。

    2. 专有网络绑定页签下,单击新增绑定,在弹出的新增专有网络绑定窗口中,填写专有网络信息。

      • 专有网络:选择需要同步的数据源所绑定的专有网络。

      • 可用区、交换机:优先选择数据源所在的可用区和交换机;如果所在的可用区不可选择,则选择其他任意可用区和交换机。但您需要确保VPC网络跟数据源所在的VPC网络可连通。

      • 安全组:可以选择您名下任意自建安全组,但所选择的安全组需满足以下条件:

        • 安全组产品控制台查看安全组,访问规则的入方向允许放行Kafka集群HTTP端口(通常为9092到9094)。

        • 安全组的授权对象网段包含上一步所选择的交换机网段。

  2. 添加自定义路由。

    说明

    如果您在上述步骤中选择了数据源所在的可用区和交换机,可跳过此步骤。如果您选择了其他可用区和交换机,则需要参考以下指导进行自定义路由的操作。

    1. 进入DataWorks管控台资源组列表页面,找到您要连通的独享数据集成资源组,单击资源组右侧的网络设置,进入资源组网络设置页面。

    2. 专有网络绑定页签下,找到刚才绑定的专有网络记录,单击右侧的自定义路由

    3. 在弹出的自定义路由窗口,单击新增路由,填写路由信息,关键配置信息如下。

      • 目的VPC:选择数据源所在的地域和专有网络。

      • 目的Switch实例:选择数据源所在的交换机。

步骤2 添加数据库白名单(以下以AliKafka为例)

  1. 获取需要绑定的白名单IP。

    • 如果您使用VPC内网连通独享资源组和数据源,则白名单内填写的网段为独享资源组上专有网络绑定时选择的交换机所在网段。您可以在独享资源组列表页面单击网络设置按钮,在专有网络绑定页面找到交换机所在网段。独享绑定的交换机网段

    • 如果您使用公网连通独享资源组和数据源,则白名单为独享资源组的EIP地址。您可以在独享资源组列表页面单击查看信息按钮,获得EIP地址。查看独享资源组EIP

  2. 数据源绑定白名单。

    1. Kafka绑定白名单。

      进入AliKafka产品控制台。找到您要进行数据同步的Kafka集群,单击目标实例名称,进入实例详情页面,在左侧导航中选择白名单管理,单击添加白名单分组按钮,绑定白名单。

    2. Hologres绑定白名单。

      进入HoloWeb。找到您要进行数据同步的Hologres数据源,在安全中心页面,单击IP白名单按钮。holo

创建同步任务

  1. 进入数据集成主站,单击同步任务进入同步任务页面,在页面中单击新增任务,开始配置同步任务。

  2. 配置同步任务基本信息。

    1. 任务名称:自定义。

    2. 同步类型:来源数据源选择Kafka,去向数据源选择Hologres,并选择实时ETL同步方案。

    3. 网络与资源配置:在下拉框中分别选择已创建的Kafka数据源、Hologres数据源、独享数据集成资源组,单击测试所有连通性,保障资源组与数据源之间的网络连通性。

  3. 配置Kafka来源信息。

    单击页面上方的Kafka来源,编辑Kafka来源信息。配置来源

    1. 配置Kafka基本信息。

      • 选择Kafka集群中需要同步的Topic。

      • 其他配置可使用任务创建时生成的默认值,也可根据需要进行修改。

    2. 单击右上角的数据采样

      在弹出对话框中指定好开始时间采样条数后,单击开始采集按钮,可以对指定的Kafka Topic进行数据采样,同时您可以预览Topic中的数据,为后续数据处理节点的数据预览和可视化配置提供输入。

  4. 编辑数据处理节点。

    单击添加图标可以增加数据处理方式。目前提供5种数据处理方式,您可根据需要做顺序编排,在任务运行时会按照编排的数据处理先后顺序执行数据处理,5种数据处理方式包括:数据脱敏字符串替换数据过滤JSON解析字段编辑与赋值数据处理每完成一个数据处理节点配置,可以单击右上角的数据输出预览按钮,在弹出对话框中,单击重新获取上游输出,模拟得到Kafka Topic采样数据经过当前数据处理节点处理后的结果。

    数据输出预览窗口,您可以根据需要修改输入数据,或者单击手工构造数据按钮自定义输入数据,然后单击预览按钮,查看当前数据处理节点对数据的处理结果,当数据处理节点处理异常,或者产生脏数据时,也会实时反馈异常信息,能够帮助您快速评估数据处理节点配置的正确性,以及是否能得到预期结果。

    说明

    数据输出预览强依赖Kafka来源的数据采样,在执行数据输出预览前需要先在Kafka来源表单中完成数据采样。

    输出预览

  5. 配置Hologres去向信息。

    单击页面上方的Hologres,编辑Hologres去向源信息。去向信息

    1. 配置基本信息。

      • 选择要写入的Hologres schema。

      • 选择要写入的Hologres表是自动建表还是使用已有表

      • 填写或者选择要写入的Hologres表名。

    2. 编辑建表结构。

      当选择自动建表时,您需要单击编辑表结构按钮,在弹框中编辑建表结构。同时,支持您单击根据上游节点输出列重新生成表结构按钮,自动根据上游节点输出列,生成表结构。您可以在自动生成的表结构中选择一列配置为主键。

      说明

      Hologres表必须有主键,否则无法保存配置。

      编辑建表结构

    3. 配置字段映射。

      保存建表结构或者选择使用已有表时,系统会自动按照同名映射原则生成上游列与Hologres表列之间的映射,您可根据需要进行调整,支持一个上游列映射到多个Hologres表列,不允许多个上游列映射到一个Hologres表列,当上游列未配置到Hologres表列的映射时,对应列不会写入Hologres表。字段映射

    4. 配置上游流入动态字段处理策略。

      上游流入动态字段处理策略用于控制上游数据处理节点(目前可以生成动态列的数据处理节点只有JSON解析)生成动态列的处理方式。如果在JSON解析节点配置了动态输出字段,则在Hologres节点中会出现上游流入动态字段处理策略表单。

      动态列指在任务配置中未明确定义列名,而是根据源端输入数据内容的不同,能够解析出不同列名和列值,并输出到Hologres节点的列。对上游流入动态字段处理策略如下表所示:

      参数

      描述

      加列

      如果在Hologres表中无与动态列同名的列,则触发Hologres表加列后将动态列写入。

      忽略

      如果在Hologres表中无与动态列同名的列,则忽略该动态列,将其他配置了映射关系的列写入Hologres表。

      报错

      如果在Hologres表中无与动态列同名的列,则同步任务报错停止。

  6. 高级参数配置。

    单击页面右上角的高级参数配置,对同步任务运行时的并行度和资源进行配置,您可以根据Kafka Topic数据流量和分区数量确定对应配置项的取值,建议按照如下规则进行配置:

    • 读端并发数=Kafka Topic分区数

    • 写端并发数=Kafka Topic分区数

    • 内存=1.5G+(256MB*Kafka Topic分区数)

    • Kafka Topic分区数大于12时开启分布式运行

    • 子任务数=分区数除以6向上取整

    • 单子任务读端并发数=6

    • 单子任务写端并发数=6

    • 单子任务内存占用=2GB

    由于同步任务的性能表现和资源占用受到源端和目标端系统数据流量、网络环境和系统负载等因素影响,基于上述简单规则,您可以根据实际情况做调整和修改。

  7. 报警配置。

    为能够及时感知到同步任务的异常并做出响应和处理,您可以对同步任务设置不同的报警策略。

    1. 单击右上角的报警配置,进入实时同步子任务报警设置页面。

    2. 单击新增报警,配置报警规则。

      报警规则设置可以参考实时同步任务告警设置最佳实践

      说明

      报警原因DLL通知时,适用的DDL只允许选择新增列,在同步任务解析出新的动态列时将触发报警(触发条件不是在Hologres表加列)。新增报警

    3. 管理报警规则。

      对于已创建的报警规则,您可以通过报警开关控制报警规则是否开启,同时,您可以根据报警级别报警给不同的接收人。报警规则

  8. 资源组配置。

    您可以在右上角的资源组配置处修改任务运行使用的独享数据集成资源组。

  9. 模拟运行。

    完成上述所有任务配置后,您可以通过模拟运行功能,模拟整个任务针对少量采样数据的处理,查看数据写入Hologres表后的结果。当任务配置错误、模拟运行过程中异常或者产生脏数据时,会实时反馈出异常信息,能够帮助您快速评估任务配置的正确性,以及是否能得到预期结果。模拟运行

    1. 单击页面右上角的模拟运行,在弹出的对话框中设置针对Kafka Topic的采样参数(开始时间采样条数)。

    2. 单击开始采集得到采样数据。

    3. 单击预览按钮,模拟整个任务针对少量采样数据的处理。

完成上述所有任务配置后,单击完成配置,完成同步任务的配置。

任务运维

启动同步任务

完成配置之后,界面会自动跳转到任务列表页面,您可以单击对应任务的操作列的启动按钮,启动同步任务。启动同步任务

查看任务运行状态

创建完成同步任务后,您可以在同步任务页面,找到已创建的同步任务,单击任务名称执行概况空白处,查看任务的运行详情。任务详情分为三个部分:任务详情

  • 基本信息:您可以查看同步任务的数据源信息、绑定的资源组等信息。

  • 执行状态:Kafka到Hologres的同步任务分为结构迁移实时数据同步两个步骤,您可以查看任务执行状态。

  • 详细信息:您可以查看结构迁移以及实时数据同步的执行详情。

    • 结构迁移中包含目标表的创建方式(已有表或自动建表),如果是自动建表,将会为您展示建表的DDL。

    • 实时数据同步中包含实时同步的统计信息,包含实时的读写流量、脏数据、Failover和运行日志。

任务重跑

  • 直接重跑

    不修改任务配置,直接单击同步任务操作列的更多 > 重跑操作,重跑一次性任务。

  • 修改后重跑

    编辑任务,进行修改操作后,单击完成。此时任务的操作会变成应用更新,单击应用更新会直接触发修改后的任务重跑。实时同步任务会按照新的配置运行。

  • 本页导读 (1)
文档反馈