Apache Flume是一个分布式、可靠和高可用的系统,用于从大量不同的数据源有效地收集、聚合和移动大量日志数据,进行集中式的数据存储。Flume的核心是Agent,Agent中包含Source、Channel和Sink。本文为您介绍如何使用HDFS Sink写入数据至JindoFS。

前提条件

已创建集群,并选择了Flume服务。创建集群详情,请参见创建集群

背景信息

Apache Flume的HDFS Sink通过Flush接口保证事务性写入。JindoFS Block模式从SmartData 3.2.x及后续版本开始默认支持Flush接口,您可以直接配置Sink。

OSS本身不支持Flush接口,SmartData 3.4.x及后续版本支持Flume可恢复性写入JindoFS Cache模式或OSS。通过支持Flush接口,虽然不能让数据立刻可见,但是可以确保数据暂存在云端,当写入程序发生Crash时,您可以通过调用SDK或命令行,恢复程序在Crash之前已经写入JindoFS Cache模式或OSS的数据。

使用Flume写入JindoFS Block模式

您需要配置Sink,各参数含义请参见Apache Flume。配置Sink示例如下:
# 配置JFS Sink

xxx.sinks.jfs_sink.hdfs.path = jfs://${your_ns_name}/flume_dir/%H%M/%S
xxx.sinks.jfs_sink.hdfs.round =true
xxx.sinks.jfs_sink.hdfs.roundValue = 15
xxx.sinks.jfs_sink.hdfs.Unit = minute
xxx.sinks.jfs_sink.hdfs.filePrefix = your_topic

# Sink参数,batchSize需要设置大一些,建议每次Flush的量在32MB以上,否则会影响性能。
xxx.sinks.jfs_sink.hdfs.batchSize = 100000
...
xxx.sinks.jfs_sink.rollSize = 3600
xxx.sinks.jfs_sink.threadsPoolSize = 30
xxx.sinks.jfs_sink.fileType = DataStream
xxx.sinks.jfs_sink.writeFormat = Text
说明 本文示例中的your_ns_name为您NameSpace的名称。

使用Flume写入JindoFS Cache模式或OSS

  1. 开启Flush和Recover功能。
    具体步骤,请参见开启Flush和Recover功能
  2. 配置Sink。
    各参数含义请参见Apache Flume。配置Sink示例如下:
    # 配置OSS Sink
    
    xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%H%M/%S
    xxx.sinks.oss_sink.hdfs.round = true
    xxx.sinks.oss_sink.hdfs.roundValue = 15
    xxx.sinks.oss_sink.hdfs.Unit = minute
    xxx.sinks.oss_sink.hdfs.filePrefix = <your_topic>
    
    # Sink参数,batchSize需要设置大一些,建议每次Flush的量在32MB以上,否则会影响性能。
    xxx.sinks.oss_sink.hdfs.batchSize = 100000
    ...
    xxx.sinks.oss_sink.rollSize = 3600
    xxx.sinks.oss_sink.threadsPoolSize = 30
    说明 本文示例中的your_bucket为您OSS Bucket的名称。
  3. 恢复文件Flush的数据。
    具体步骤,请参见恢复文件Flush的数据

开启Flush和Recover功能

使用Flume写入JindoFS Cache模式时,需要开启Flush和Recover功能。

  1. 进入SmartData服务。
    1. 登录阿里云E-MapReduce控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击上方的集群管理页签。
    4. 集群管理页面,单击相应集群所在行的详情
    5. 在左侧导航栏单击集群服务 > SmartData
  2. 进入smartdata-site页面。
    1. 单击配置页签。
    2. 服务配置页面,单击storage
  3. 单击右上角的自定义配置,添加如下参数。
    参数 描述
    fs.jfs.cache.oss.flush.enable 是否开启Flush和Recover功能,开启时需要设置为true。
    fs.jfs.cache.flush.staging.path Flush的数据和Manifest的暂存区,默认值为/tmp。

    例如:在使用默认值时,如果文件的写入路径是:oss://test-bucket/dir1/file1,则Staging的路径为oss://test-bucket/tmp/dir1/file1

  4. 保存配置。
    1. 单击右上角的保存
    2. 确认修改对话框中,输入执行原因,开启自动更新配置
    3. 单击确定

恢复文件Flush的数据

您可以使用Recover命令恢复数据。
jindo jfs -recover [-R]
                   [-flushStagingPath {flushStagingPath}]
                   [-accessKeyId ${accessKeyId}]
                   [-accessKeySecret ${accessKeySecret}]
                   <path>
参数 描述
-R 是否递归Recover,恢复文件夹时需要添加该参数。
-flushStagingPath Flush的数据和Manifest的暂存区,默认值为/tmp。

例如:在使用默认值时,如果文件的写入路径是:oss://test-bucket/dir1/file1,则Staging的路径为oss://test-bucket/tmp/dir1/file1

-accessKeyId 阿里云账号的AccessKey ID。
-accessKeySecret 阿里云账号的AccessKey Secret。
path Flush的数据的写入路径。

例如:oss://test-bucket/dir1/file1oss://test-bucket/dir1

您也可以通过JindoOssFileSystem的Recover接口恢复文件或目录。本示例为恢复文件夹。
JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs;
boolean isFolder = true;
jindoFileSystem.recover(path, isFolder);
说明 代码中的path表示待恢复文件的路径,isFolder表示是否需要递归恢复。如果是恢复文件夹,需要设置为true。如果是恢复文件,需要设置为false。