本文汇总了实时计算上下游存储常见报错与解决方案。

如何避免结果表写入不同步问题?

  • 问题描述

    把相同的数据保存到属于不同数据库但表结构相同的两个结果表里,两张表数据的写入速率不同,但最终的结果一致。

  • 问题原因

    由于结果表的并发量不同,数据分发出现数据乱序。

  • 解决方案
    • 对作业进行自动配置调优,详情请参见自动配置调优
    • 为两个并发不同的结果表设置相同的节点,让两个节点串联(Chain)在一起。详情请参见手动配置调优

存放的数据产品不在实时计算支持的存储列表,应该如何处理?

  • 如果您所选择的阿里云存储产品不在实时计算系统支持范围之内,请您提交工单进行咨询。
  • 如果您使用了自建开源存储,需要将您的数据迁移到实时计算支持的列表存储中。数据迁移的过程中如果遇到问题,您可以提交工单进行咨询。

DataHub作为结果表存在大量重复数据,应该如何处理?

  • 问题描述

    从DataHub的Topic A中读取数据,通过实时计算处理后写入到DataHub中的Topic B,发现有大量重复数据。

  • 原因分析

    上述问题是由DataHub存储机制导致的。DataHub是一种消息队列服务,写入一条数据,就存储一条数据,不会对数据的重复性进行判断,而会将每一条输入的数据当成新数据进行储存。在此场景中,运算过程中的计算数据同样被DataHub储存,导致产生重复数据。

  • 解决方案

    请您将结果表换成RDS。RDS会根据主键更新结果,不会储存计算过程数据。

DataHub中Shard在只读状态下可以被读取数据吗?

  • 问题描述
    DataHub中Shard在只读状态下可以被读取数据吗?shard
  • 问题解析

    Shard在可读写状态下是可以被读取数据的。上图中Shard状态为CLOSED,而CLOSED是只读状态,并非可读写状态,因此无法被读取。关于Shard状态说明,请参见Shard状态说明

实时计算每处理一条记录都会去新增或者更新RDS数据库表吗?将实时计算结果写入数据库,是否会对RDS产生性能影响?

实时计算的每条结果数据会触发对RDS的写入。如果存在KEY,重复的值会更新(UPDATE),不重复的值会新增(INSERT)。您可以通过设置Batchsize参数来降低对RDS性能的影响。

实时计算上游数据源是否支持RDS数据库?

不支持,实时计算支持以下两类上游数据源来驱动计算:
  • 消息队列类型。例如,消息队列Kafka、消息队列RocketMQ版和DataHub等。
  • 非消息队列类型。例如,大数据计算服务(MaxCompute,原名ODPS)和表格存储Tablestore等。

如何使用Java连接实时计算与消息队列?

您可以使用Flink Datastream的方式实现实时计算读取消息队列RocketMQ版中的数据。详情请参见概述

如何把结果数据输出到一个存在主键限制的外部存储?

A:例如,输出结果数据到一张存在PRIMARY KEY限制的MySQL表,MySQL建表语句如下。
CREATE TABLE words ( 
  word VARCHAR(30) NOT NULL,
  cnt BIGINT,
  CONSTRAINT word_pk PRIMARY KEY(word)
);
Q:Blink统计每个word的出现频次后,把结果输出到真实的物理表words中。Blink定义的结果表rds_output的DDL中声明了PRIMARY KEY为word,Sink处理每条输入数据的逻辑为:
  • 如果words表中不存在和rds_output主键一致的数据(w,c1),则在words表插入数据(w,c1)
  • 如果words表中存在和rds_output主键一致的数据(w,c1),则在words表中更新该条记录,即替换(w,c)(w,c1)
说明
  • Blink结果表rds_output定义中的PRIMARY KEY必须和MySQL真实的物理表的PRIMARY KEY一致。
  • 如果rds_output表DDL里未声明PRIMARY KEY,在作业运行阶段,向words表中插入一条word已经存在的记录,则会因为MySQL的主键限制引发运行异常。
---Split用于把句子切分成单词。
CREATE FUNCTION split AS 'com.hjc.test.blink.sql.udx.SplitUdtf';

CREATE TABLE tt_input(
  sentences VARCHAR
) with (
  type='datahub',
  endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
  project='<yourProjectName>',
  topic='<yourTopic>',
  accessId='<yourAccessID>',
  accessKey='<yourAccessSecret>'
);

CREATE TABLE rds_output(
  word VARCHAR,
  cnt BIGINT,
  PRIMARY KEY(word)
) with (
  type='rds',
  url='<yourDatabaseURL>',
     tableName='words',
     userName='<yourDatabaseUserName>',
     password='<yourDatabasePassword>'
);

INSERT INTO rds_output
  SELECT word, COUNT(1) AS cnt FROM 
  (
    SELECT word FROM tt_input, LATERAL TABLE(split(sentences)) as T(word)
  ) GROUP BY word;

使用SINK WITH PRIMARY KEY有什么限制?

SINK WITH PRIMARY KEY用于把数据输出到一个有主键限制的外部存储中,例如MySQL。使用SINK WITH PRIMARY KEY时,必须同时满足以下两个条件:
  • Sink的外部存储必须具有PRIMARY KEY限制,且Blink结果表定义里声明的PRIMARY KEY必须和物理存储的PRIMARY KEY一致。
  • Sink必须具有按主键进行INSERT、UPDATE和DELETE的能力。

哪些类型的结果表支持在DDL中定义PRIMARY KEY,并能够按照PRIMARY KEY进行INSERT、UPDATE和DELETE?

首先,结果表对应的外部存储必须具有PRIMARY KEY限制。其次,Connector Sink支持单个并发间按照PRIMARY KEY进行INSERT、UPDATE和DELETE。同时满足以上两个条件,才可以在Sink对应的DDL中声明PRIMARY KEY。

Blink内置的Connectors中,仅以下Connectors支持按照PRIMARY KEY进行INSERT、UPDATE和DELETE:
  • 云数据库HBase版
  • 交互式分析Hologres
  • 分析型数据库MySQL3.0
  • 分析型数据库MySQL
  • 表格存储TableStore
  • 云数据库RDS SQL Server版
  • 云数据HybridDB for MySQL
  • 云数据库Redis版
  • 云数据库RDS PostgreSQL版
  • 云数据库RDS版
  • Oracle
  • Elasticsearch
  • Phoenix5

在Sink节点的并发度不为1,多个并发中存在相同PRIMARY KEY的数据时,Sink节点如何保证多个并发间对相同PRIMARY KEY的数据更新顺序一致?

不能保证多并发间对相同PRIMARY KEY的数据更新顺序一致。因为多个并发间如果存在相同PRIMARY KEY的数据,最终写入到外部存储的数据取决于各个并发间完成写动作的前后顺序,这将导致结果的不确定性。

为了避免因为Sink节点多并发导致的数据不确定性,需要将相同PRIMARY KEY的数据落在同一个Sink并发节点上,以下Pattern都可以让Sink的输入数据满足要求:
  • {Aggregate -> Sink}{Aggregate -> [非状态节点,例如calculate] -> Sink}
  • {双流 Join -> Sink}{双流 Join -> [非状态节点,例如calculate] -> Sink}
其中Aggregate的GroupKey或双流Join的JoinKey需要满足以下条件:
  • SINK PRIMARY KEY需要和JoinKey或GroupKey任何一个相同。
  • SINKPRIMARY KEY必须是JoinKey或GroupKey任何一个的子集。
  • 如果SINK PRIMARY KEY不和JoinKey或GroupKey任何一个相同,且不是它们任何一个的子集,需要显式的从JoinKey或GroupKey转换到SINK PRIMARY KEY,最直接的方式就是加一层Aggregate,从而以PRIMARY KEY作为Aggregate的GroupKey。

为什么相同PRIMARY KEY的数据写到外部存储的顺序和Sink的输入流相同PRIMARY KEY的数据顺序不一致?

  • 作业SQL有问题
    例如,由于Sink的PRIMARY KEY和上游的Aggregate的GroupKey并无关联,因此,相同PRIMARY KEY的数据落在不同的并发,最终导致外部存储中的数据顺序和Sink的输入流的不一致,因而数据结果也不正确。
    • 错误示例1
      ---Split用于把句子切分成单词。
      CREATE FUNCTION split AS 'com.hjc.test.blink.sql.udx.SplitUdtf';
      
      CREATE TABLE tt_input(
        sentences VARCHAR
      ) with (
        type='datahub',
        endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
        project='<yourProjectName>',
        topic='<yourTopic>',
        accessId='<yourAccessID>',
        accessKey='<yourAccessSecret>'
      );
      
      ---错误示例:Sink的PRIMARY KEY和Aggregate的GroupKey不一致。
      CREATE TABLE rds_output(
        word VARCHAR,
        cnt BIGINT,
        PRIMARY KEY(cnt)
      ) with (
        type='rds',
        url='<yourDatabaseURL>',
           tableName='words'
           userName='<yourDatabaseUserName>',
           password='<yourDatabasePassword>'
      );
      
      INSERT INTO rds_output
        SELECT word, COUNT(1) AS cnt FROM 
        (
          SELECT word FROM tt_input, LATERAL TABLE(split(sentences)) as T(word)
        ) GROUP BY word
      说明 该示例中,Sink的PRIMARY KEY和Aggregate的GroupKey不一致,导致相同cnt的数据落在不同的并发上,最终写到Sink里的数据不确定。PRIMARY KEY为1的记录可能为('Hello',1)('word',1),这取决于哪条记录最后写到RDS。
      图1
    • 错误示例2
      ---Split用于把句子切分成单词。
      CREATE FUNCTION split AS 'com.hjc.test.blink.sql.udx.SplitUdtf';
      
      CREATE TABLE tt_input(
        sentences VARCHAR
      ) with (
        type='datahub',
        endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
        project='<yourProjectName>',
        topic='<yourTopic>',
        accessId='<yourAccessID>',
        accessKey='<yourAccessSecret>'
      );
      
      --- 错误示例:Sink的Primary和Aggregate的GroupKey不一致。
      CREATE TABLE rds_output(
        word VARCHAR,
        cnt BIGINT,
        PRIMARY KEY(cnt)
      ) with (
          type='rds',
        url='<yourDatabaseURL>',
           tableName='words'
           userName='<yourDatabaseUserName>',
           password='<yourDatabasePassword>'
        -- 声明了partitionBy是cnt,Sink前强制按照partitionBy进行一次Shuffle。
        partitionBy = 'cnt'
      );
      
      INSERT INTO rds_output
        SELECT word, COUNT(1) AS cnt FROM 
        (
          SELECT word FROM tt_input, LATERAL TABLE(split(sentences)) as T(word)
        ) GROUP BY word
        ;
      说明 该示例中, Sink的Primary和Aggregate的GroupKey仍然不一致,和错误示范1唯一的区别为Sink声明了PartitionedKey,导致写入Sink前会按照partitionKey进行一次Shuffle。但是这样的结果仍然具有不确定性,PRIMARY KEY为1的记录可能为('Hello',1)('word',1),这取决于哪条记录最后写到RDS。
      图2
  • 作业PlanJson被破坏
    作业的PlanJson被手动修改了Sink或Aggregate并发度,导致Aggregate和下游的Sink不能Chain起来,破坏了Sink输入数据的Distribution属性。
    CREATE TABLE tt_input(
      word VARCHAR
    ) with (
      type='datahub',
      endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
      project='<yourProjectName>',
      topic='<yourTopic>',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>'
    );
    
    CREATE TABLE rds_output(
    word BIGINT,
    cnt BIGINT,
    PRIMARY KEY(word)
    ) with (
      type='rds',
      url='<yourDatabaseURL>',
         tableName='words'
         userName='<yourDatabaseUserName>',
         password='<yourDatabasePassword>'
    );
    
    INSERT INTO rds_output
      SELECT word, COUNT(1) AS cnt FROM tt_input GROUP BY word;
    说明 该示例中,SQL本身没有问题,但在PlanJson上手动调整Sink节点的并发度,导致原来的Forward的边变成了Rebalance的边。最终PRIMARY KEY为Hello对应的记录可能为('Hello',2),也可能不存在该PRIMARY KEY,这取决于哪条记录最后写到RDS。
    图3