本文为您介绍InfluxDB结果表的DDL定义、WITH参数、类型映射和代码示例。

什么是时序数据库InfluxDB®版

时序数据库InfluxDB®版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。时序数据库InfluxDB®版详情请参见InfluxDB®️介绍

前提条件

已创建InfluxDB的数据库,详情请参见管理用户账号和数据库

使用限制

仅Flink计算引擎VVR 2.1.5及以上版本支持InfluxDB Connector。

DDL定义

Flink支持使用InfluxDB作为结果表输出,示例代码如下。
CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);
建表默认格式:
  • 第0列:metric(VARCHAR),必填。
  • 第1列:timestamp(BIGINT),必填,单位为毫秒。
  • 第2列:tag_value1(VARCHAR),必填,最少填写一个。
  • 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。
    写入多个field_fieldValue时,您需要按照如下格式填写。
    field_fieldValue1 类型,
    field_fieldValue2 类型,
    ...      
    field_fieldValueN 类型
    示例如下。
    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...      
    field_fieldValueN INTEGER
说明 结果表中只支持metrictimestamptag_*field_*,不能出现其他的字段。

WITH参数

参数 说明 是否必填 备注
connector 结果表类型。 固定值为influxdb。
url InfluxDB的服务地址。

在InfluxDB中,url为VPC网络地址,例如:https://localhost:8086http://localhost:3242。

url支持HTTP和HTTPS。

database InfluxDB的数据库名称。 例如db-flink。
username 数据库的用户名。 需要对目标数据库有写权限。用户名详情请参见管理用户账号和数据库
password 数据库的密码。 密码详情请参见管理用户账号和数据库
batchSize 批量提交的记录条数。 默认每次批量提交300条记录。
retentionPolicy 保留策略。 如果您不配置该参数时,该参数会被默认填写为每个数据库的默认保留策略autogen,保留策略详情请参见管理用户账号和数据库
ignoreErrorData 是否忽略异常数据。 取值如下:
  • true:忽略异常数据。
  • false(默认值):不忽略异常数据。

类型映射

InfluxDB字段类型 Flink版字段类型
BOOLEAN BOOLEAN
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR

代码示例

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = 3,
  'fields.tagvalue.length' = 3,
  'fields.timestamp.min' = 1587539547000,
  'fields.timestamp.max' = 1619075547000,
  'fields.filedvalue.min' = 1,
  'fields.filedvalue.max' = 100000,
  'rows-per-second' = 50
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
   `metric`,
   `timestamp`,
   `filedvalue`,
   `tagvalue`
FROM datahub_source;