本文为您介绍如何创建Oracle数据库源表,以及创建源表时使用的WITH参数、类型映射和属性字段。

什么是Oracle数据库

Oracle数据库是市场上广泛应用的关系型数据库。
注意
  • 仅支持使用Oracle 11g版本创建Oracle数据库源表。
  • 仅支持在实时计算3.2.2及以上版本创建Oracle数据库源表。
  • 如果源表数据量较小,则您需要在Blink 3.2.x和3.3.x版本配置queryIntervalMs参数为5000
  • 不能修改Oracle Source节点的并发,默认一个Table对应一个并发。

语法示例

实时计算支持使用Oracle数据库作为源表,代码示例如下。
create table oracle_source (
    EMPLOYEE_ID BIGINT,
    START_DATE TIMESTAMP,
    END_DATE TIMESTAMP,
    JOB_ID VARCHAR,
    DEPARTMENT_ID VARCHAR
) with (
    type = 'oracle',
    url = 'jdbc:oracle:thin:@//127.0.0.1:1521/ORACLE',
    userName = 'userName',
    password = 'password',
    dbName = 'hr',
    tableName = 'job_history',
    timeField = 'START_DATE',
    startTime = '2007-1-1 00:00:00'
);

WITH 参数

参数 描述 是否必选 示例值
type 源表类型 oracle
url 数据库连接串,固定句式为jdbc:oracle:thin:@//数据库IP:端口号/数据库名 jdbc:oracle:thin:@//127.0.0.1:1521/XE
userName 登录数据库的用户名
password 登录数据库的密码
tableName 数据库的表名。数据库的表名有以下两种表达方式:
  • 表名1,表名2
  • 数据库名.表名1,表名2
说明 多个表名之间用逗号(,)隔开。
  • table1,table2
  • db1.table1,table2
timeField 更新数据库的时间
dbName 数据库名 如果tableName参数配置了数据库名,则dbName不需要重复配置。
startTime 读取数据的开始时间 2019-5-15 00:00:00
timeZone 数据库时区 Asia/Shanghai", "UTC
queryTimeRangeMs 获取数据的时长,单位为毫秒。
说明 queryTimeRangeMs参数的取值需要大于queryIntervalMs参数。
默认值为5000。
queryIntervalMs 查询数据库的时间间隔,单位为毫秒。 默认值为100。
connectionMaxActive 最大活跃连接数 默认值为10。
maxRetry 最大连接失败重试次数 默认值为3。
escapeFields 是否对数据库字段名进行转义。 escapeFields参数的取值如下:
  • false(默认值),不区分大小写。
  • true,区分大小写。
lengthCheck 单行字段条数的检查策略 lengthCheck参数的取值如下:
  • NONE(默认值):
    • 当解析的字段个数大于定义个数时,按从左到右的顺序,取定义字段的数据使用。
    • 当解析的字段个数小于定义个数时,跳过当前行数据。
  • SKIP:当解析的字段个数和定义个数不同时,跳过当前行数据。
  • EXCEPTION:当解析的字段个数和定义个数不同时,系统提示异常。
  • PAD
    • 当解析的字段个数大于定义个数时,按从左到右的顺序,取定义字段的数据使用。
    • 当解析的字段个数小于定义个数时,按从左到右顺序,在行尾用null填充缺少的字段。
columnErrorDebug 是否打开调试开关。
说明 如果打开调试开关,则会将解析异常的日志打印出来。
默认值为false。

类型映射

Oracle字段类型 实时计算字段类型
  • CHAR
  • VARCHAR
  • VARCHAR2
VARCHAR
FLOAT DOUBLE
NUMBER BIGINT
DECIMAL DECIMAL

代码示例

实时计算包含Oracle数据库源表的代码示例如下。
create table oracle_source (
    EMPLOYEE_ID BIGINT,
    START_DATE TIMESTAMP,
    END_DATE TIMESTAMP,
    JOB_ID VARCHAR,
    DEPARTMENT_ID VARCHAR
) with (
    type = 'oracle',
    url = 'jdbc:oracle:thin:@//127.0.0.1:1521/ORACLE',
    userName = 'userName',
    password = 'password',
    dbName = 'hr',
    tableName = 'job_history',
    timeField = 'START_DATE',
    startTime = '2007-1-1 00:00:00'
);

create table test_out(
    EMPLOYEE_ID BIGINT,
    START_DATE TIMESTAMP,
    END_DATE TIMESTAMP,
    JOB_ID VARCHAR,
    DEPARTMENT_ID VARCHAR
) with (
  type='print'
);

INSERT INTO test_out
SELECT 
 EMPLOYEE_ID,
 START_DATE,
 END_DATE,
 JOB_ID,
 DEPARTMENT_ID
from oracle_source;

常见问题

Q:查询不到数据该如何处理?

A:
  • 问题原因:Blink运行故障。
    解决方法:查看TaskManager的Round start:[{}], end:[{}]Round read records日志,如果未查询到日志数据,则Blink运行故障。您可以参见运行Failover解决Blink运行故障。
    说明
    • Round start:[{}], end:[{}] 用来显示查询数据的起始时间戳。
    • Round read records用来显示查询到的数据记录。
  • 问题原因:如果您使用的Blink为3.2.x和3.3.x版本,查询结束时间超过当前时间。

    解决方法:您需要配置queryIntervalMs5000