本文以Windows系统,Flink 1.10版本为例,为您介绍如何使用Flink作业实时同步DataHub数据至阿里云HBase。
作业开发
- 下载并解压Hbase_ASI_Demo-main示例到本地。
- 在Intellij IDEA中,单击,打开刚才解压缩完成的Hbase_ASI_Demo-main。
- 双击打开\Hbase_ASI_Demo-main\src\main\java\Hbase_Demo后,修改HbaseDemo.java文件中的DataHub与HBase相关参数。
//DataHub相关参数
//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。
private static String endPoint = "inner endpoint";//内网访问。
private static String projectName = "yourProject";
private static String topicSourceName = "yourTopic";
private static String accessId = "yourAK";
private static String accessKey = "yourAS";
private static Long datahubStartInMs = 0L;//设置消费的启动位点对应的时间。
//HBase相关参数
private static String zkQuorum = "yourZK";
private static String tableName = "yourTable";
private static String columnFamily = "yourcolumnFamily";
存储类型 |
参数 |
说明 |
DataHub |
endPoint |
替换成您DataHub的endPoint。
说明 如果填写内网Endpoint,就不用填写公网Endpoint。
|
projectName |
替换成您DataHub的Project名称。 |
topicSourceName |
替换成您DataHub的Topic名称。 |
accessId |
替换成您阿里云账号的AccessKey ID。 |
accessKey |
替换成您阿里云账号的AccessKey Secret。 |
HBase |
zkQuorum |
替换成您HBase的ZK链接地址。您可以通过以下方式查看ZK链接地址信息:
- 在HBase控制台目标实例详情左侧导航栏,单击数据库连接后,在连接信息区域查看。
- 在HBase安装目录/conf/hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。
|
tableName |
替换成您创建的HBase表名。 |
columnFamily |
替换成您创建的HBase表列簇名。 |
- 在下载文件中pom.xml所在的目录执行如下命令打包文件。
mvn package -Dcheckstyle.skip
根据您pom.xml文件中artifactId的信息,\Hbase_ASI_Demo-main\target\目录下会出现Hbase_Demo-1.0-SNAPSHOT.jar的JAR包,即代表完成了开发工作。
查看结果
- 构建测试数据。
在Flink全托管新建并启动SQL作业,将结果数据发送至DataHub中,作为该最佳实践的测试数据。
CREATE TEMPORARY TABLE datagen_source (
a BOOLEAN
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE datahub_sink (
a BOOLEAN,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'datahub',
'endPoint' = '<yourEndpoint>', --替换为您开通的Datahub服务的endPoint。
'project' = '<yourProject>', --替换为您开通的Datahub服务的Project名称。
'topic' = '<yourTopic>', --替换为您开通的Datahub服务Topic名称。
'accessId'='<yourAccessId>', --替换为阿里云账号的AccessKey ID。
'accessKey'='<yourAccessKey>' --替换为阿里云账号的AccessKey Secret。
);
INSERT INTO datahub_sink
SELECT
a,'rowkey3' as b,'123' as c
FROM datagen_source;
- 连接HBase集群。
- 执行
scan 'hbase_sink'
查看写入数据。出现类似如下输出,则表示已经成功将DataHub数据写入阿里云HBase。

在文档使用中是否遇到以下问题
更多建议
匿名提交