本文为您介绍如何使用DataWorks数据集成,将Kafka集群上的数据迁移至MaxCompute。

前提条件

  • 开通MaxCompute
  • 开通DataWorks
  • 在DataWorks上完成创建业务流程,本例使用DataWorks简单模式。详情请参见创建业务流程
  • 搭建Kafka集群

    进行数据迁移前,您需要保证自己的Kafka集群环境正常。本文使用阿里云EMR服务自动化搭建Kafka集群,详细过程请参见Kafka快速入门

    本文使用的EMR Kafka版本信息如下:
    • EMR版本:EMR-3.12.1
    • 集群类型:Kafka
    • 软件信息:Ganglia 3.7.2 ZooKeeper 3.4.12 Kafka 2.11-1.0.1 Kafka-Manager 1.3.3.16

    Kafka集群使用专有网络,区域为华东1(杭州),主实例组ECS计算资源配置公网及内网IP。

背景信息

Kafka是一款分布式发布与订阅的消息中间件,具有高性能、高吞量的特点被广泛使用,每秒能处理上百万的消息。Kafka适用于流式数据处理,主要应用于用户行为跟踪、日志收集等场景。

一个典型的Kafka集群包含若干个生产者(Producer)、Broker、消费者(Consumer)以及一个Zookeeper集群。Kafka集群通过Zookeeper管理自身集群的配置并进行服务协同。

Topic是Kafka集群上最常用的消息的集合,是一个消息存储逻辑概念。物理磁盘不存储Topic,而是将Topic中具体的消息按分区(Partition)存储在集群中各个节点的磁盘上。每个Topic可以有多个生产者向它发送消息,也可以有多个消费者向它拉取(消费)消息。

每个消息被添加到分区时,会分配一个Offset(偏移量,从0开始编号),是消息在一个分区中的唯一编号。

步骤一:准备Kafka数据

您需要在Kafka集群创建测试数据。为保证您可以顺利登录EMR集群Header主机,以及保证MaxCompute和DataWorks可以顺利和EMR集群Header主机通信,请您首先配置EMR集群Header主机安全组,放行TCP 22及TCP 9092端口。

  1. 登录EMR集群Header主机地址。
    进入EMR Hadoop控制台集群管理 > 主机列表页面,确认EMR集群Header主机地址,并通过SSH连接远程登录。
  2. 创建测试Topic。
    执行如下命令创建测试所使用的Topic testkafka。
    [root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
    Created topic "testkafka".
  3. 写入测试数据。
    执行如下命令,可以模拟生产者向Topic testkafka中写入数据。由于Kafka用于处理流式数据,您可以持续不断的向其中写入数据。为保证测试结果,建议写入10条以上的数据。
    [root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
    >123
    >abc
    >
    您可以同时再打开一个SSH窗口,执行如下命令,模拟消费者验证数据是否已成功写入Kafka。当数据写入成功时,您可以看到已写入的数据。
    [root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
    123
    abc

步骤二:在DataWorks上创建目标表

在DataWorks上创建目标表用以接收Kafka数据。

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 单击相应工作空间后的进入数据开发
  2. 右键单击业务流程,选择新建 > MaxCompute >
  3. 新建表页面,选择引擎类型并输入表名
  4. DDL模式对话框,输入如下建表语句,单击生成表结构
    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;
    其中的每一列,对应于DataWorks数据集成Kafka Reader的默认列:
    • __key__表示消息的key。
    • __value__表示消息的完整内容 。
    • __partition__表示当前消息所在分区。
    • __headers__表示当前消息headers信息。
    • __offset__表示当前消息的偏移量。
    • __timestamp__表示当前消息的时间戳。

    您还可以自主命名,详情参见Kafka Reader

  5. 单击提交到生产环境确认

步骤三:同步数据

  1. 新建自定义资源组。

    由于当前DataWorks的默认资源组无法完美支持Kafka插件,您需要使用自定义资源组完成数据集成。自定义资源组详情请参见新增自定义数据集成资源组

    在本文中,为节省资源,直接使用EMR集群Header主机作为自定义资源组。完成后,请等待服务器状态变为可用

  2. 新建数据集成节点。
    1. 进入数据开发页面,右键单击指定业务流程,选择新建 > 数据集成 > 离线同步
    2. 新建节点对话框中,输入节点名称,并单击提交
  3. 在顶部菜单栏上,单击转化脚本图标。
  4. 在脚本模式下,单击顶部菜单栏上的**图标。
  5. 配置脚本,示例代码如下。
    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_first",
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1,
            }
        }
    }
    您可以通过在Header主机上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查看group.id参数,及消费者的Group名称。
    [root@emr-header-1 ~]#  kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    _emr-client-metrics-handler-group
    console-consumer-69493
    console-consumer-83505
    console-consumer-21030
    console-consumer-45322
    console-consumer-14773
    console-consumer-83505为例,您可以根据该参数在Header主机上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令确认beginOffsetendOffset参数。
    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    Note: This will not show information about old Zookeeper-based consumers.
    Consumer group 'console-consumer-83505' has no active members.
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    testkafka                      6          0               0               0          -                                                 -                              -
    test                           6          3               3               0          -                                                 -                              -
    testkafka                      0          0               0               0          -                                                 -                              -
    testkafka                      1          1               1               0          -                                                 -                              -
    testkafka                      5          0               0               0          -                                                 -                              -
  6. 配置调度资源组。
    1. 在右侧导航栏,单击调度配置
    2. 资源属性区域,选择调度资源组为您创建的自定义资源组。
      说明 如果您需要将Kafka的数据周期性(例如每小时)写入MaxCompute,您可以使用beginDateTimeendDateTime参数,设置数据读取的时间区间为1小时,然后每小时调度一次数据集成任务。详情请参见Kafka Reader
  7. 单击**图标运行代码。
  8. 您可以在运行日志查看运行结果。

后续步骤

您可以新建一个ODPS SQL节点运行SQL语句,查看从Kafka同步数据至MaxCompute是否成功。详情请参见使用临时查询运行SQL语句(可选)