使用Flink处理数据并写入ClickHouse,除了可以使用Flink原生的JDBC connector以外,本节介绍了使用flink-connector-clickhouse方式将Flink中的数据写入ClickHouse。这种方式支持Flink SQL,可以使用Flink Table DDL语句定义ClickHouse表,然后通过INSERT语句把数据写入ClickHouse。

前提条件

Flink版本为1.11.0及之后的版本

操作步骤

  1. mvn archetype:generate 命令创建项目,生成过程中会提示输入 group-idartifact-id 等。
    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. 编辑 pom.xml 中的<dependencies />小节添加依赖。
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>flink-connector-clickhouse</artifactId>
          <version>1.11.0</version>
        </dependency>
  3. 创建数据写入程序文件。

    因为flink-connector-clickhouse支持Flink SQL,您可以使用Flink Table DDL语句定义ClickHouse Sink表。

    CREATE TABLE sink_table (
        name VARCHAR,
        grade BIGINT,
        rate FLOAT,
        more FLOAT,
        PRIMARY KEY (name, grade) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */
    ) WITH (
        'connector' = 'clickhouse',
        'url' = 'clickhouse://<host>:<port>',
        'username' = '<username>',
        'password' = '<password>',
        'database-name' = 'default',        /* ClickHouse 数据库名,默认为 default */
        'table-name' = 'd_sink_table',      /* ClickHouse 数据表名 */
        'sink.batch-size' = '1000',         /* batch 大小 */
        'sink.flush-interval' = '1000',     /* flush 时间间隔 */
        'sink.max-retries' = '3',           /* 最大重试次数 */
        'sink.write-local' = 'true',        /* 是否直写 local 表 */
        'sink.partition-strategy' = 'hash', /* hash | random | balanced */
        'sink.partition-key' = 'name',      /* hash 策略下的分区键 */
        'sink.ignore-delete' = 'true'       /* 忽略 DELETE 并视 UPDATE 为 INSERT */
    )
    参数 描述
    database-name ClickHouse 数据库名,默认为 default
    table-name ClickHouse 数据表名
    sink.batch-size batch大小需要根据您的实际业务场景进行决定,或进行测试。在我们的测试中,一般 8000左右的 batch size 可以达到较为理想的性能。
    sink.flush-interval flush时间间隔
    sink.max-retries 最大重试次数
    sink.write-local 是否直写local表
    • 关闭时(设置为 false),Flink 将写入 Distributed 表,再由 ClickHouse 做 shard 分区,吞吐量受单节点限制。
    • 开启时(设置为 true),connector 自动查询对应的 cluster 和 local 表,数据直接写入 Distributed 表所对应 cluster 各 shard 的 local 表,能够达到较高的写入性能。
    sink.partition-strategy 分区策略
    • balanced:round-robin 轮转。
    • random:随机分区。
    • hash:hash 分区,通过 sink.partition-key 指定分区键。
    sink.partition-key hash策略下的分区键
    sink.ignore-delete 默认值为TRUE,表示忽略 DELETE 并视 UPDATE 为 INSERT。ClickHouse 对 UPDATE 和 DELETE 的支持并不完善,例如不支持同步更新等。若显式设置 sink.ignore-delete 为 false,则会尝试使用 ALTER TABLE UPDATE/DELETE 来更新数据,此时性能会有显著下降。

    通过INSERT语句把数据写入ClickHouse。

    INSERT INTO sink_table SELECT name, grade, rate FROM source

    数据写入程序文件完整示例如下:

    package org.myorg.quickstart
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<your-path-to-test-csv>/source.csv"
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.getConfig.disableClosureCleaner
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", DataTypes.STRING())
          .field("age", DataTypes.BIGINT())
          .field("sex", DataTypes.STRING())
          .field("grade", DataTypes.BIGINT())
          .field("rate", DataTypes.FLOAT())
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val create_sql =
          s"""
          | CREATE TABLE sink_table (
          |    name VARCHAR,
          |    grade BIGINT,
          |    rate FLOAT,
          |    PRIMARY KEY (name) NOT ENFORCED
          |) WITH (
          |    'connector' = 'clickhouse',
          |    'url' = 'clickhouse://<host>:<port>',
          |    'table-name' = 'd_sink_table',
          |    'sink.batch-size' = '1000',
          |    'sink.write-local' = 'true',
          |    'sink.partition-strategy' = 'hash',
          |    'sink.partition-key' = 'name'
          |)
          |""".stripMargin
    
        tEnv.executeSql(create_sql);
    
        tEnv.executeSql(
          "INSERT INTO sink_table SELECT name, grade, rate FROM source"
        )
      }
    }
  4. 编译运行。
    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink与ClickHouse的数据类型映射

Flink的数据类型 ClickHouse的数据类型
BOOLEAN UInt8
TINYINT Int8
SMALLINT Int16
INTEGER Int32
INTERVAL_YEAR_MONTH Int32
BIGINT Int64
INTERVAL_DAY_TIME Int64
FLOAT Float32
DOUBLE Float64
CHAR String
VARCHAR String
BINARY FixedString
VARBINARY FixedString
DATE Date
TIME_WITHOUT_TIME_ZONE DateTime
TIMESTAMP_WITH_TIME_ZONE DateTime
TIMESTAMP_WITHOUT_TIME_ZONE DateTime
DECIMAL Decimal

FAQ

如何开启多线程写入?
通过 Flink 的SetParallelism进行配置。