本文为您介绍如何使用时序数据库InfluxDB连接器。

背景信息

时序数据库InfluxDB®版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。时序数据库InfluxDB®版详情请参见InfluxDB®️介绍

InfluxDB连接器支持的信息如下。
类别详情
支持类型结果表
运行模式流模式
数据格式Point
特有监控指标
  • numRecordsOut
  • numRecordsOutPerSecond
  • currentSendTime
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
是否支持更新或删除结果表数据

前提条件

已创建InfluxDB的数据库,详情请参见管理用户账号和数据库

使用限制

仅Flink计算引擎VVR 2.1.5及以上版本支持InfluxDB Connector。

语法结构

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);
建表默认格式:
  • 第0列:metric(VARCHAR),必填。
  • 第1列:timestamp(BIGINT),必填,单位为毫秒。
  • 第2列:tag_value1(VARCHAR),必填,最少填写一个。
  • 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。
    写入多个field_fieldValue时,您需要按照如下格式填写。
    field_fieldValue1 类型,
    field_fieldValue2 类型,
    ...      
    field_fieldValueN 类型
    示例如下。
    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...      
    field_fieldValueN INTEGER
说明 结果表中只支持metrictimestamptag_*field_*,不能出现其他的字段。

WITH参数

参数说明是否必填备注
connector结果表类型。固定值为influxdb。
urlInfluxDB的服务地址。

在InfluxDB中,URL为VPC网络地址,例如:https://localhost:8086http://localhost:3242。

URL支持HTTP和HTTPS。

databaseInfluxDB的数据库名称。例如db-flink。
username数据库的用户名。需要对目标数据库有写权限。用户名详情请参见管理用户账号和数据库
password数据库的密码。密码详情请参见管理用户账号和数据库
batchSize批量提交的记录条数。默认每次批量提交300条记录。
retentionPolicy保留策略。如果您不配置该参数时,该参数会被默认填写为每个数据库的默认保留策略autogen,保留策略详情请参见管理用户账号和数据库
ignoreErrorData是否忽略异常数据。参数取值如下:
  • true:忽略异常数据。
  • false(默认值):不忽略异常数据。

类型映射

InfluxDB字段类型Flink版字段类型
BOOLEANBOOLEAN
INTINT
BIGINTBIGINT
FLOATFLOAT
DECIMALDECIMAL
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
VARCHARVARCHAR

使用示例

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;