本文为您介绍Elasticsearch(ES)维表DDL定义、WITH参数、CACHE参数和代码示例。
说明 该Connector支持的功能是和阿里云Elasticsearch产品对齐的,暂不支持HTTPS加密访问形式。
DDL定义
CREATE TABLE es_dim(
field1 LONG, ---作为JOIN时的key,必须为STRING类型。
field2 VARBINARY,
field3 VARCHAR
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
说明
- 维表JOIN时的key有且只能有1个,且为Elasticsearch对应索引的文档ID。
- 支持Elasticsearch 5.5及以上版本。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 维表类型 | 是 | 固定值为elasticsearch 。
|
endPoint | Server地址 | 是 | 例如:http://127.0.0.1:9200。 |
accessId | Elasticsearch实例的用户名 | 否 | 无 |
accessKey | Elasticsearch实例的密码 | 否 | 无 |
indexName | 文档索引名称 | 是 | 无 |
typeNames | Type名称 | 否 | 默认值为_doc 。
说明 Elasticsearch 7.0以上版本不建议设置该参数。
|
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
cache | 缓存策略 | 否 | Elasticsearch仅支持以下两种缓存策略:
|
cacheSize | 缓存大小 | 否 | 选择LRU 缓存策略后,可以设置缓存大小,默认值为10000行。
|
cacheTTLMs | 缓存更新时间间隔 | 否 | 默认缓存不超时,单位为毫秒。不同缓存策略下的功能如下:
|
cacheEmpty | 是否缓存空结果。 | 否 | 默认值为true。 |
类型映射
Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系。
代码示例
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;
在文档使用中是否遇到以下问题
更多建议
匿名提交