本文为您介绍Postgres的CDC(Change Data Capture)源表DDL定义、WITH参数、类型映射和代码示例。

什么是Postgres的CDC源表

Postgres的CDC源表,即Postgres的流式源表,用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。
说明 如果您使用的是开源的PostgreSQL数据库,Flink全托管的Postgres CDC connector支持读取的Postgres版本为9.6及以上版本。

DDL定义

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数 说明 是否必填 数据类型 备注
connector 源表类型 STRING 固定值为postgres-cdc
hostname Postgres数据库的IP地址或者Hostname。 STRING
username Postgres数据库服务的用户名。 STRING
password Postgres数据库服务的密码 STRING
database-name 数据库名称 STRING 数据库名称支持正则表达式以读取多个数据库的数据。
schema-name Postgres Schema名称 STRING Schema名称支持正则表达式以读取多个Schema的数据。
table-name Postgres表名 STRING 表名支持正则表达式去读取多个表的数据。
port Postgres数据库服务的端口号 INTEGER 默认值为5432。
decoding.plugin.name Postgres Logical Decoding插件名称 STRING 根据Postgres服务上安装的插件确定。支持的插件列表如下:
  • decoderbufs(默认值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
说明 如果您使用的是阿里云RDS PostgreSQL,你需要开启逻辑解码(wal2json)功能,详情请参见逻辑解码(wal2json)
debezium.* Debezium属性参数 STRING 更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性
说明 建议每个表都设置debezium.slot.name参数,以避免出现PSQLException: ERROR: replication slot "debezium" is active for PID 974报错。

类型映射

Postgres CDC和实时计算Flink版字段类型对应关系如下。
Postgres CDC字段类型 实时计算Flink版字段类型
SAMLLINT SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER INT
SERIAL
BIGINT BIGINT
BIGSERIAL
BIGINT BIGINT
REAL FLOAT
FLOAT4
FLOAT8 DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n) STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA BYTES