本文为您介绍云数据库MongoDB版结果表DDL定义、WITH参数和代码示例。

什么是云数据库MongoDB

云数据库MongoDB完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份恢复、性能优化等功能。

前提条件

已创建云数据库MongoDB实例,详情请参见创建单节点实例

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库MongoDB Connector。
  • 仅支持将云数据库MongoDB作为结果表使用。

DDL定义

CREATE TABLE mongodb_sink(
  id INT, 
  number INT
) WITH (
  'connector' = 'mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri' = '<yourUri>',
  'maxConnectionIdleTime' = '<yourMaxConnectionIdleTime>',  
  'batchSize' = '1024'  
);

WITH参数

参数 说明 是否必填 备注
connector 结果表类型。 固定值为mongodb
database 数据库名称。 无。
collection 数据集合。 无。
uri MongoDB连接串。 例如mongodb://123@dds-/admin?replicaSet=mgset-32966591
maxConnectionIdleTime 连接超时时长。 默认值为60000,单位为毫秒。
batchSize 每次批量写入的条数。 默认值为1024。

代码示例

CREATE TEMPORARY TABLE datagen_source (
  v INT, 
  p INT
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongodb_sink(
  id INT, 
  number INT
) with (
  'connector'='mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri'='<yourUri>'
);

INSERT INTO mongodb_sink 
   SELECT v, p
FROM datagen_source;