全部产品
云市场

一键流式归档 HBase 数据到 Spark

更新时间:2019-09-19 17:00:13

X-Pack Spark 一键归档支持将 HBase数据通过增量日志方式归档到 X-Pack Spark 数据仓库(对在线业务稳定性无影响),在 X-Pack Spark集群进行数据计算性能大幅提升。同时相比传统的大数据Lambda架构(流、批计算、在线服务独立)架构升级为”大数据在线存储和计算一体化”,整体架构如下:

X-Pack Spark 一键归档模式推出了”大数据在线存储和计算一体化”架构,具有以下优点:

  • 数据一致性:避免双写;业务直接写hbase,数据自动归档到spark离线数仓
  • 稳定性:异步log同步到spark数仓,对hbase稳定性没有任何影响
  • 性能:spark分析列存,比直接分析hbase性能大幅提升

该版本限制:

1、本文档只是介绍如何归档 HBase 增量数据,关于 HBase 历史数据的归档后续会发布,增量数据和历史数据合并可以使用这个脚本进行。

2、本归档只支持归档 HBase 最新版本的数据,不支持多版本的数据归档。

3、目前支持天级别把hbase的增量日志 ETL处理后写入spark表,接下来会逐步支持小时级别以及分钟级别

4、由于 HBase 的动态列特点,我们默认使用 \u0001 代表不存在的列; \u0002 代表删除的列;如果这两个值在业务上已经使用了,建议修改默认值。

一、前置条件

购买 HBase 集群

如已经购买可以忽略,如未购买可以到 https://common-buy.aliyun.com/?commodityCode=hbasepre#/buy 页面购买 HBase 集群,比如本文购买好的 HBase 集群如下:

hbase

注意图中的 VPC ID 和 VSwitch ID,后面申请 BDS 和 Spark 集群均需要使用同样的 VPC ID 和 VSwitch ID。

购买 X-Pack Spark 集群

X-Pack Spark 集群主要用于合并 WAL 数据,把 HBase 列数据转换成行数据,归档后的数据就存储在 X-Pack Spark 集群中。本文购买好的 X-Pack Spark 集群如下:

spark

注意图中的 VPC ID 和 VSwitch ID,和上面 HBase 集群一样。

购买 BDS 服务

BDS 是阿里云针对 HBase 自主研发的一套迁移同步服务,主要帮助云上的客户进行自建 HBase、云HBase集群的数据导入和导出。BDS 可以用于 HBase 集群的无缝迁移、主备容灾、异地多活、在线离线业务分离、HBase 数据归档、对接 RDS 实时增量数据等等,方便云上客户围绕 HBase 构建高可用、灵活的业务系统。更多关于 BDS 服务介绍可以参见:https://help.aliyun.com/document_detail/120883.html

一键归档服务需要依赖 BDS 服务实时读取 HBase WAL 日志到 Spark 集群,供 Spark 归档 HBase 增量数据。本文购买好的 BDS 服务如下(购买基础配置就行,后续可以扩容):

BDS

注意图中的 VPC ID 和 VSwitch ID,和上面 HBase 集群一样。

关联集群

为了让归档顺利运行,我们需要到 BDS 里面关联 HBase 和 Spark 集群,以便可以读取对应 HBase 集群的 WAL 日志以及写 WAL 到 X-Pack Spark 集群中,主要步骤如下:到 BDS 控制台 -> 关联数据库,然后关联相关的 HBase 和 Spark 集群,如下图:

关联 HBase 集群

bds1

关联 X-Pack Spark 集群

bds2

创建测试表

我们到 HBase 集群创建好相关的测试表:

  1. hbase(main):002:0> create 'hbase2spark_archive_test', 'a','b'
  2. Created table hbase2spark_archive_test
  3. Took 0.7371 seconds
  4. => Hbase::Table - hbase2spark_archive_test
  5. hbase(main):003:0>

二、创建归档任务

前面我们已经准备好相关的测试环境及 HBase 表,现在我们可以创建归档任务了,具体步骤如下:

  1. 到 HBase X-Pack 控制台
  2. 选择左边菜单里面的一键归档
  3. 切换到新的页面之后选择创建归档
  4. 在弹出的对话框里面选择流式归档
  5. 填写归档名称,这里填的是 HBase_archives_test
  6. 点击确认

如下图所示

a

点击 确认 之后我们就进入到归档编辑页面中,里面我们可以填写相关的信息,具体如下:

ar

图中序号说明如下:

  1. 选择 Spark集群 ID,这里就是我们上面购买的 Spark 集群;
  2. 归档名称,这里就是我们创建归档时候填写的,如果觉得创建的时候名称写的不好,可以在这里修改;
  3. 选择 BDS 集群 ID,这就是我们上面购买的 BDS 服务;
  4. 数据源实例ID,这个就是我们需要归档的 HBase 表所在的集群 ID,也就是我们上面购买的 HBase 集群;
  5. 选择响应的归档类型,这里选择 BDS增量归档,T+1形式的归档,也就是今天归档 HBase 对应表昨天新增的数据;另外一种是 BDS 增全量归档,这个是把T+1增量数据和历史全量数据进行 Merge 的结果,由于目前 HBase 历史数据归档到 X-Pack Spark 还不支持,所以这个选项先忽略;
  6. 分区字段,这个就是归档到 X-Pack Spark 表的分区字段名称;
  7. 每天定时调度这里可以设置这个归档作业每天什么时候跑;
  8. 分区时间格式就是分区字段按天保存的时间格式,比如如果选择 yyyyMMdd 格式,那么我们的 X-Pack Spark 表分区字段就是 ds=20190904
  9. 源表名这里选择需要归档的 HBase 表,这里我们选择上面创建的 hbase2spark_archive_test 表即可;
  10. Spark库名就是将 HBase 的表归档到 X-Pack Spark 仓库的哪个数据库里面,默认为 default;
  11. spark表名这里可以设置将 HBase 的表归档到 X-Pack Spark 仓库里面哪张表里面,这里设置为 HBase_archives_test,这张表不需要我们去创建,这要提交完这个归档任务就会自动创建。
  12. 创建重名按钮可以查看归档到 X-Pack Spark 里面的 HBase_archives_test 是否已经存在,如果存在的话就无法提交归档任务,这时候可以用一个新的表或者到交互式工作台删除对应的表;
  13. 选择需要归档的列,其中最左边的 a、b就是 hbase2spark_archive_test 表对应的列族,右边需要用户自己填写对应的qualifier,同一列族多个qualifier,点击“增加”来添加列;
  14. 填写完相关信息之后,就可以点击提交按钮

点击完提交按钮之后,会在数据工作台里面的工作流创建相关的工作流,并且会在数据工作台里面的作业管理创建相关作业,具体如下:a111

同时也会在 X-Pack Spark 里面创建 HBase_archives_test 表,如下:

11

可以看到,X-Pack Spark 创建的 HBase_archives_test 表里面增加了一个 timestamp 字段,这个就是就是对应每行数据的最新更新版本(一般为数据插入到 HBase 的时间戳),除了 timestamp 和 ds 字段,其余我们选择需要归档的数据都是 binary 类型的,这个和 HBase 里面存储的数据一样,需要使用 Bytes.toInt 之类的函数转换成我们需要的数据。

按照上面的配置之后每天在spark表”HBase_archives_test”表中生成一个时间分区比如”20190905”里面会包含hbase写入这一天的数据

三、测试及数据查看

插入测试数据

上面我们已经在 HBase 里面创建了 hbase2spark_archive_test 表,并且创建了对应的归档任务,现在我们模拟真实业务,往 hbase2spark_archive_test 表里面插入数据:

  1. put 'hbase2spark_archive_test', '001', 'a:name', 'X-Pack spark'
  2. put 'hbase2spark_archive_test', '001', 'a:email', 'spark@alibaba-inc.com'
  3. put 'hbase2spark_archive_test', '001', 'b:address', '北京市朝阳区'
  4. put 'hbase2spark_archive_test', '001', 'b:tele', '13888888888'
  5. put 'hbase2spark_archive_test', '002', 'a:name', 'hbase'
  6. put 'hbase2spark_archive_test', '002', 'a:age', '100'
  7. put 'hbase2spark_archive_test', '002', 'b:address', '北京市海淀区'
  8. put 'hbase2spark_archive_test', '003', 'a:name', 'cassandra'
  9. put 'hbase2spark_archive_test', '003', 'a:age', '102'
  10. put 'hbase2spark_archive_test', '003', 'a:email', 'cassandra@alibaba-inc.com'
  11. put 'hbase2spark_archive_test', '003', 'b:address', '北京市朝阳区绿地中心'
  12. put 'hbase2spark_archive_test', '003', 'b:tele', '18888888888'
  13. put 'hbase2spark_archive_test', '001', 'b:tele', '16666666666'
  14. delete 'hbase2spark_archive_test', '002', 'a:age'

执行归档任务(这里一步主要可以用来首次的测试)

归档的正常逻辑是按照指定定时调度时间周期性的把数据ETL处理后写入到spark表,但是在测试阶段,可以写入数据,然后手动的运行一下作业看看效果。我们在上一步骤往 hbase2spark_archive_test 表插入了一些测试数据。过15分钟左右的时间,我们可以手动执行上面创建好的 HBase_archives_test 归档任务,如下:

121

在点击步骤❷ 运行的时候,会弹出一个对话框,由于数据是 T+1 归档的,所以 2019-09-04 时间往 hbase2spark_archive_test 表插入的测试数据需要在 2019-09-05 才能抽取到,所以我们需要把弹出对话框里面的时间设置为 2019-09-05 ,然后再点击确认按钮即可,这时候归档作业就会把 HBase 里面 hbase2spark_archive_test 表的数据归档到 X-Pack Spark 中 HBase_archives_test 里面。可以到运行记录里面查看作业的运行情况。

注意:如果 BDS 里面出现类似下面的异常

  1. FileNotFoundException: File does not exist: /hbase/oldWALs/hb-xxxxy-core-002.hbase.rds.aliyuncs.com%2C16020%2C1563257822007.1568186024374

说明需要采集的 WAL 日志被清理了,请到对应的 HBase 集群将 hbase.exporter.enabled 设置为 true , 在 HBase 2.0.7 之前需要重启 HBase 集群,之后版本不需要重启;或者延长 WAL 的保持时长,默认为10分钟,可以通过 hbase.master.logcleaner.ttl 参数修改。

在 X-Pack Spark 里面查询归档的数据

等作业运行完成之后,我们就可以查到刚刚归档的数据(创建sql类型的交互式查询):

可以先执行

  1. show partitions HBase_archives_test

查看下生成的分区情况:1正如图中显示的,X-Pack Spark 正确的归档了 20190904 这一天 HBase 新增的数据。我们在查看一下里面的数据情况:12

注意,把 HBase 的数据归档到 X-Pack Spark 的表中是以 Binary 的形式处理的,所以上面执行的结果都是字节码数据,我们可以编写一个 UDF(UDF 的编写可以参见 https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html ),将字节码转换成我们需要的数据,如下(创建scala类型的交互式查询):

12

  1. import org.apache.hadoop.hbase.util.Bytes
  2. val b2str = (s: Array[Byte]) => if (s != null){ val str = Bytes.toString(s); if(str == "\u0001") Some("\\u0001") else if(str =="\u0002") Some("\\u0002") else Some(str) }else None
  3. spark.udf.register("b2str", b2str)
  4. spark.sql("select b2str(rowKey),b2str(a_name),b2str(a_age),b2str(a_email),b2str(b_address),b2str(b_tele),timestamp,ds from HBase_archives_test order by ds,rowkey").show(200, 1000)

可以看出, X-Pack Spark 里面的 HBase_archives_test 表就是我们上面往 HBase 中 hbase2spark_archive_test 表插入的数据。

注意:大家肯定看到上面输出的结果有 \u0001 和 \u0002 两种特殊的标记,由于 HBase 里面具有动态列的特性,并不是每行都拥有全的列,所以数据归档到 Spark 里面我们默认使用 \u0001 特殊字符代表这列在 HBase 里面是不存在的。同时 HBase 里面可以删除单个列,为了区分哪列被删除,我们默认使用 \u0002 代表被删除的列。如果这两种特殊字符有业务含义,可以分别通过 spark.archive.default.valuespark.archive.delete.value 参数修改。

删除收集的 WAL 日志

当我们创建好一键归档 HBase 的数据,这时候会不停的将增量的数据采集到 Spark 上的 HDFS 中,为了不使这些采集的数据占用过多的磁盘空间,归档作业默认会清除两天之前的所有数据,如果有特殊需要,可以通过 spark.archive.delete.wal.days 参数设置需要清除多少天之前的数据。

比如我们将 spark.archive.delete.wal.days 设置为 -2,那么当我们归档 ds=20190904 这天的数据,ds=20190903 这天的 WAL 数据会保留,ds=20190902、ds=20190901… 等目录下的 WAL 数据会被清理。

例子

如何在代码里面区分出 \u0001 和 \u0002 两种特殊的标记呢?下面举一个值为 boolean 类型的数据解析逻辑:

  1. val bytes2Boolean = (s: Array[Byte]) => {
  2. if (s != null) {
  3. val str = Bytes.toString(s)
  4. if (str == "\u0001") { // 缺省值,也就是 HBase 中不存在这列
  5. None
  6. } else if(str == "\u0002") { // 删除的列
  7. None
  8. } else {
  9. Some(Bytes.toBoolean(s))
  10. }
  11. } else {
  12. None
  13. }
  14. }
  15. spark.udf.register("bytes2Boolean", bytes2Boolean)
  16. spark.sql("select rowkey,bytes2Boolean(a_boolean) from archives_test order by ds,rowkey").show(200, 1000)
  17. +-------------------+------------------+
  18. | rowkey|UDF:b2b(a_boolean)|
  19. +-------------------+------------------+
  20. |000000-000002685885| false|
  21. |000000-000128875498| true|
  22. |000000-000130503168| true|
  23. |000000-000378607622| true|
  24. |000000-000502385431| false|
  25. |000000-000630195268| true|
  26. |000000-000877290922| null|
  27. |000001-000255367603| false|
  28. |000001-000504639796| true|
  29. |000001-000755523231| false|
  30. +-------------------+------------------+
  31. only showing top 10 rows