本文汇总了实时计算Flink版基本概念、作业操作、调试运维等方面的常见问题。

什么是实时计算?

数据的业务价值会随着时间的流失而迅速降低,因此在数据发生后必须尽快对其进行计算和处理,目前,对于信息的高时效性和可操作性要求越来越高,这就要求软件系统能够在更短的时间内处理更多的数据。而传统的大数据处理模型将在线事务处理和离线分析从时序上完全分开,对于数据加工均遵循传统的日清日毕模式,即以小时甚至以天为计算周期对当前数据进行累计并处理。显然,传统的大数据处理方式无法满足数据实时计算的需求。数据处理时延造成的影响对要求苛刻的业务场景会非常明显,例如实时大数据分析、风控预警、实时预测和金融交易等领域。

实时计算可以有效地缩短全链路数据流时延、实时化计算业务逻辑、平摊计算成本,最终有效满足实时处理大数据的业务需求。

实时计算具备三大特点:
  • 实时(Realtime)且无界(Unbounded)的数据流

    实时计算面对的数据是实时且流式的,这些数据按照时间发生顺序被实时计算订阅和消费。例如网站的访问单击日志流,只要网站不关闭,其单击日志流将不停产生并进入实时计算系统。

  • 持续(Continous)且高效的计算

    实时计算是一种事件触发的计算模式,触发源为无界流式数据。一旦有新的流数据进入实时计算系统,它就立刻发起并进行一次计算任务,因此整个过程是持续进行的。

  • 流式(Streaming)且实时的数据集成

    被流数据触发的计算结果,可以被直接写入目的数据存储。例如将计算后的报表数据直接写入阿里云关系型数据库RDS(Relational Database Service)进行报表展示。因此流数据的计算结果可以同流式数据一样,持续被写入目的数据存储。

实时计算与批量计算相比存在哪些差异?

下面从用户和产品层面来理解两类计算方式的区别。
  • 批量计算
    批量计算是一种批量、高时延、主动发起的计算。目前绝大部分传统数据计算和数据分析服务均是基于批量数据处理模型:使用ETL系统或者OLTP系统进行构造数据存储,在线的数据服务(包括Ad-Hoc查询、DashBoard等)通过构造SQL语言访问上述数据存储并取得分析结果。这套数据处理的方法论伴随着关系型数据库在工业界的演进而被广泛采用。传统的批量数据处理模型如下图所示。批量计算模型
    1. 装载数据

      对于批量计算,用户需要预先将数据加载到计算系统,您可以使用ETL系统或者OLTP系统装载原始数据。系统将根据自己的存储和计算情况,对于装载的数据进行一系列查询优化、分析和计算。

    2. 提交请求
      系统主动发起一个计算作业(例如MaxCompute的SQL作业,或Hive的SQL作业)并向上述数据系统进行请求。此时计算系统开始调度(启动)计算节点进行大量数据计算,该过程的计算量可能非常大,耗时长达数分钟乃至于数小时。由于数据累计处理不及时,上述计算过程中可能就会存在一些历史数据,导致数据不新鲜。
      说明 您可以根据业务需要随时调整计算SQL后再次提交作业,您甚至可以使用AdHoc查询做到即时修改即时查询。
    3. 返回结果

      计算作业完成后将数据以结果集形式返回给用户,由于保存在数据计算系统中的计算结果数据量巨大,需要用户再次集成数据到其他系统。一旦数据结果巨大,整体的数据集成过程就会漫长,耗时可能长达数分钟乃至于数小时。

  • 实时计算
    实时计算是一种持续、低时延、事件触发的计算作业。相对于批量计算,流式计算整体上还属于比较新颖的计算概念。由于当前实时计算的计算模型较为简单,所以在大部分大数据计算场景下,实时计算可以看做是批量计算的增值服务,实时计算更强调计算数据流和低时延。实时计算数据处理模型如下。流计算模型
    1. 实时数据流

      使用实时数据集成工具,将实时变化的数据传输到流式数据存储(例如消息队列、DataHub)。此时数据的传输实时化,将长时间累积的大量数据平摊到每个时间点,不停地小批量实时传输,因此数据集成的时延得以保证。

      源源不断的数据被写入流数据存储,不需要预先加载的过程。同时,流计算对于流式数据不提供存储服务,数据持续流动,在计算完成后就被立刻丢弃。

    2. 提交流式任务

      批量计算要等待数据集成全部就绪后才能启动计算作业,而流式计算作业是一种常驻计算服务。实时计算作业启动后,一旦有小批量数据进入流式数据存储,流计算会立刻计算并得出结果。同时,阿里云流计算还使用了增量计算模型,将大批量数据分批进行增量计算,进一步减少单次运算规模并有效降低整体运算时延。

      从用户角度,对于流式作业,必须预先定义计算逻辑,并提交到流式计算系统中。在整个运行期间,流计算作业逻辑不可更改。用户通过停止当前作业运行后再次提交作业,此时之前已经计算完成的数据是无法重新再次被计算。

    3. 实时结果流

      不同于批量计算,结果数据需等待数据计算结果完成后,批量将数据传输到在线系统。流式计算作业在每次小批量数据计算后,无需等待整体的数据计算结果,会立刻将数据结果投递到在线/批量系统,实现计算结果的实时化展现。

    使用实时计算的顺序如下:
    1. 提交实时计算作业。
    2. 等待流式数据触发实时计算作业。
    3. 计算结果持续不断对外写出。
计算模型差别对比。
对比指标 批量计算 实时计算
数据集成方式 预先加载数据。 实时加载数据到实时计算。
使用方式 业务逻辑可以修改,数据可重新计算。 业务逻辑一旦修改,之前的数据不可重新计算(流数据易逝性)。
数据范围 对加载的所有或大部分数据进行查询或处理。 对滚动时间窗口内的数据或仅对最近的数据记录进行查询或处理。
数据大小 大批量数据。 单条记录或几条记录的微批量数据。
性能 几分钟至几小时的延迟。 大约几秒或几毫秒的延迟。
分析 复杂分析。 简单的响应函数、聚合和滚动指标。

阿里云实时计算Flink版适用哪些场景?

阿里云实时计算Flink版提供标准的Flink SQL语义可以协助您完成流式计算逻辑的处理,但无法满足某些特定场景的业务需求。阿里云实时计算Flink版为部分授信用户提供全功能的UDF函数,帮助授信用户完成业务定制化的数据处理需求。所以,在流数据分析领域,您可以直接使用Flink SQL+UDF即可完成大部分流式数据分析处理逻辑。

阿里云实时计算Flink版优势和限制如下:
  • 阿里云实时计算Flink版更擅长于进行流式数据分析、统计和处理。
  • 阿里云实时计算Flink版不适合于SQL不能够解决的领域。例如,复杂的迭代数据处理和复杂的规则引擎告警。

阿里云实时计算Flink版使用有哪些限制?

请您评估以下限制对您业务的影响,并做相应的调整。如有疑问,请您提交工单进行咨询。
  • 阿里云实时计算Flink版购买指导,请参见如何购买
  • 阿里云实时计算Flink版WebConsole仅支持Chrome浏览器访问。
  • 阿里云实时计算Flink版支持华东、华北、华南地域,您可以按区域购买。
  • 阿里云实时计算Flink版在简单的流式压测处理场景下(例如过滤、清洗等),一个实时计算CU的处理能力为1000条/秒。在复杂的流式压测处理场景下(例如复杂UDF计算、聚合操作等),一个实时计算CU的处理能力为500条/秒。
    说明 请根据您的业务情况,评估所需的CU数量。
  • 阿里云实时计算Flink版对整个Project下属的Task、Task版本、IDE打开Task页面均有不同限制,包括:
    • 一个阿里云账号可以购买多个Project项目。
    • 单个Project下允许最多创建100个job。
    • 单个Project下最多允许50个文件夹,深度最大不超过5层。
    • 单个Project下最多允许50个UDF/JAR。
    • 单个Project下最多允许50个数据存储注册。
    • 单个Task最多允许20个历史版本保存数。

什么是流数据?

所有大数据的产生均可以看作是一系列离散事件,这些离散事件是一条条事件流或数据流。相对于离线数据,流数据的规模普遍较小。流数据是由数据源持续产生的数据,示例如下:
  • 使用移动或Web应用程序生成的日志文件。
  • 网购数据。
  • 游戏内玩家活动信息。
  • 社交网站信息。
  • 金融交易大厅或地理空间服务数据中心内所连接设备或仪器的遥测数据。
  • 地理空间服务信息。
  • 设备或仪器的遥测数据。

阿里云实时计算Flink版的数据存储有哪些类型?

阿里云实时计算Flink版本身不带有业务存储,所有的数据均是来自于外部存储系统持有的数据。阿里云实时计算Flink版支持以下几类数据存储类型:
  • 流式的数据输入:流式数据的输入会触发和推动实时计算Flink版进行数据处理。每个实时计算Flink版作业必须至少声明一个流式数据输入源。
  • 静态数据输入:静态数据输入也被称为维表,对于每条流式数据,可以关联一个外部静态数据源进行查询,为实时计算Flink版提供数据关联查询。
  • 结果表输出:实时计算Flink版将计算的结果数据写出到目的数据表,为下游数据继续消费提供各类读写接口。

实时计算Flink版的全链路示意图是什么样的?

不同于现有的离线或批量计算模型,实时计算Flink版全链路整体上更加强调数据的实时性,包括数据的实时采集、实时计算和实时集成。三大类数据的实时处理逻辑在全链路上保证了流式计算的低时延。全链路实时计算Flink版示意图如下。全链路示意图
  1. 数据采集

    使用流式数据采集工具将数据实时地采集并传输到大数据消息Pub/Sub系统,该系统将为下游实时计算Flink版提供源源不断的事件源去触发流式计算作业的运行。

  2. 数据触发

    流数据作为实时计算Flink版的触发源,驱动实时计算Flink版运行。因此,一个实时计算Flink版作业必须至少使用一个流数据作为源。进入的数据流将直接触发实时计算Flink版进行计算。

  3. 数据集成

    实时计算Flink版将计算的结果数据直接写入目的数据存储,这其中包括多种数据存储系统、消息投递系统,甚至直接对接业务规则告警系统以发出告警信息。不同于批量计算(阿里云MaxCompute或开源Hadoop),实时计算Flink版天生自带数据集成模块,可以将结果数据直接写入到目的数据存储。

  4. 数据消费

    一旦结果数据从实时计算Flink版投递到目的数据源后,数据消费就和实时计算Flink版完全解耦。您可以使用数据存储系统访问数据,使用消息投递系统进行信息接收,或者直接使用告警系统进行告警。

实时计算Flink产品全链路使用流程是怎样的?

  1. 验证网络连通性。

    使用网络探测功能验证,详情请参见存储注册方式中的网络探测

  2. 引用外部存储。
    详情请参见数据存储概述文档引用外部存储。
    说明 如果不支持注册存储功能,请使用明文方式
  3. 编写源表、结果表、维表(可选)的DDL和DML语句。
    CREATE TABLE test_input (
      id     BIGINT,
      `name` VARCHAR,
      score  INT,
       `time` BIGINT,
        tbirus AS TO_TIMESTAMP(`time`/1000), --使用计算列,将16位时间戳转换为13位时间戳。
      WATERMARK FOR ts AS WITHOFFSET(ts, 1000) --使用窗口,其中CEP、OVER窗口需要声明。
    )with(
        type='datahub',
        ...
    );
    
    CREATE TABLE test_output (
        window_start TIMESTAMP,
      window_end   TIMESTAMP,
        ct           BIGINT
    )with(
        type='rds',
        ...
    );
    
    --简单的滚动窗口示例。
    INSERT INTO
        test_output
    SELECT  
      TUMBLE_START(ts, INTERVAL '1' MINUTE),
        TUMBLE_END(ts, INTERVAL '1' MINUTE), 
        count(*)
    FROM test_input
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
  4. 语法检查。

    详情请参见语法检查

  5. 调试。
    • 调试结果仅供展示,不输出至结果表。
    • 部分外部存储支持数据源表抽取功能。例如,DataHub源表、TableStore源表。如果不支持数据源表抽取功能,请下载模板自行上传数据完成调试。
    • Kafka中VARBINARY类型不支持调试。
    • 部分函数例如EMIT语法不支持调试。
    • 调试输出结果中,同时会显示过程数据。
  6. 上线运行。

    详情请参见如何利用报错信息快速定位作业问题?

  7. 调优。

单个阿里云实时计算Flink版作业包含几个部分?

  • SQL代码:SQL代码是计算作业的核心业务逻辑,其中包括输入表DDL声明(流式输入表、静态输入表)、输出表DDL声明,以及执行业务逻辑的DML语句。
  • 参数:参数用来描述作业运行时指标。例如并发量、批处理数据量等信息。
  • 属性:作业的业务信息。例如创建人、创建时间等相关记录。
  • 资源配置:配置作业的资源。

实时计算Flink版数据的状态保存时间是多久?

实时计算Flink版数据的状态默认保存36小时,如果某个状态超过36小时没有更新,之前的状态可能会被清掉。状态设置步骤如下:
  1. 登录实时计算控制台
  2. 单击页面顶部的开发
  3. 在作业开发区域,双击目标文件夹或目标作业名,进入作业编辑页面。
  4. 在作业编辑区域右侧导航栏,单击作业参数
  5. 作业参数页面输入状态配置参数,示例如下。
    注意
    • 状态(state)的生命周期(ttl)设置过小,会导致retract时产生NullPointException的报错。
    • 状态(state)的生命周期(ttl)设置过大,会导致子线资源的消耗增大。
    • 实时计算Flink版1.0版本。
      #使用rocksdb作为statebackend。
      #rocksdb的数据生命周期,单位为毫秒。
      state.backend.rocksdb.ttl.ms=129600000
    • 实时计算Flink版2.0及以上版本。
      #使用niagara作为statebackend,以及设定state数据生命周期,单位为毫秒。
      state.backend.type=niagara
      state.backend.niagara.ttl.ms=129600000

如何加载数据到实时计算Flink版?

实时计算Flink版支持以下两种加载数据的方式:
  • 流式数据输入:触发实时计算Flink版进行数据处理,推动实时计算Flink版持续进行数据计算。每个实时计算作业必须至少声明一个流式数据输入源。
  • 静态存储:静态数据输入通常是维表,对于每条流式数据,可以关联一个外部静态数据源进行查询,为实时计算Flink版提供了数据关联查询。

实时计算Flink版处理后的结果数据如何继续应用?

实时计算Flink版将计算的结果数据输出到目的数据表,并且为下游数据继续应用提供各类读写接口。

实时计算Flink版中一个作业里可以编写多个INSERT语句、插入到多个数据结果表吗?

  • 可以编写存在多个结果表。使用多个INSERT(DML)语句的作业代码,分别插入数据结果表。
  • 可以编写存在多个源表的作业代码。多数据源表可以进行双流JOIN、UNION等操作。
注意 不建议将完全不相关的链路写在同一个作业中。如果某条链路出现问题,则会影响整个作业,其他不相关的链路也会受到影响。

开发调试和生产运维区别是什么?

开发调试和生产运维主要有以下两点区别:
  • 开发调试对线上作业和数据存储不造成任何影响。

    开发调试发生在调试环境,所有的Flink SQL运行将在独立的调试容器运行,且所有的输出将被直接改写到调试结果屏幕,因此不会对线上生产的实时计算Flink版作业、线上生产的数据存储系统造成任何影响。

  • 开发调试过程无法完全检测部分实际生产过程中的异常。

    开发调试过程实际上不是将调试数据写入到外部数据源,而是被实时计算Flink版拦截输出至调试结果页面。因此在实时计算Flink版的调试代码是在调试容器中运行的,所以线上运行过程中可能由于对目标数据源写入格式的不兼容,导致作业运行失败。这类错误在开发调试阶段无法被发现和处理,只能到线上运行后才能发现。

    例如,您的结果数据输出到RDS系统,其中某些字段输出字符串数据长度大于RDS建表长度最大值,在调试环境下系统无法测试出该类问题,但实际生产运行过程中会引发异常。后续,实时计算Flink版提供将调试结果写入真实数据源的本地调试功能,有效辅助您缩短调试和生产的差距,尽可能在调试阶段发现并解决问题。

作业暂停恢复和停止重启的区别是什么?

图 1. 实时计算Flink版作业生命周期状态
实时计算作业生命周期状态
暂停和停止的区别:
  • 停止操作会清除作业的所有状态,停止后再启动相当于全新的一个作业。
  • 暂停操作会保留作业的所有状态:
    • 如果状态可以兼容,暂停后会复用暂停前的作业状态。
    • 如果状态无法兼容,则会报错无法恢复。
      注意 如果无法兼容,需要停止作业后,启动作业,否则作业状态将会被清理,请谨慎操作。
    暂停作业后,再恢复作业,作业状态是否兼容主要取决于实际生成的物理执行图是否一致,和具体的底层优化策略相关。暂停恢复后作业的兼容性如下表所示。
    操作变更类型 暂停恢复后状态是否兼容
    修改SQL 不兼容
    修改资源配置 兼容
    修复作业参数
    • 增删作业参数不兼容。
    • 修改作业参数值一般情况下兼容,但改变部分optimizer参数值,修改statebackend类型等会导致不兼容。
    修改WITH参数
    • 增删WITH参数不兼容。
    • 修改WITH参数值一般情况下兼容,但类似于维表async true <-> false等情况会导致不兼容。
    修改资源引用 通常情况下兼容,但类似于用UDF变更影响Schema或者UDAGG则不兼容。
    修改作业版本 自Blink 3.2.3以后,最后一位版本号修改可兼容。不兼容其它版本切换。

操作变更生效的前提是作业重新上线,并且恢复时选择按最新配置恢复。如果选择按之前配置恢复,则所有操作变更均不生效,效果等同于原作业暂停恢复。

什么是启动位点?

启动位点是读取数据的时间点。启动位点的时间点以后产生的数据,都将按照时间的顺序输入至新的作业中进行计算。

阿里云实时计算Flink版提供的编程接口是什么?

阿里云实时计算Flink版提供Flink SQL编程接口编写业务逻辑,为流式数据分析定制多种数据处理函数和操作符。以Word Count为例,通过Flink SQL编写的业务逻辑如下所示。
--声明一个流式源表。
create table stream_source(word string) ;

--声明一个目标表。
create table stream_result(word string, cnt bigint) ;

--统计word次数。
insert into stream_result
select t.word,
       count(1)
from stream_source t
group by t.word;

实时计算Flink版SQL作业支持全局ORDER BY吗?

实时计算Flink版作业不支持全局ORDER BY,ORDER BY需要在OVER窗口中使用。详情请参见TopN语句

无窗口和窗口这两类SQL的业务含义是什么?

  • 窗口函数Group Window Aggregation
    Group Window Aggregation是一种在窗口上的聚合。Group Window Aggregation在每个窗口结束且无提前的观测值(early firing)时,会发出的一条结果数据,例如MicroBatch。时间窗口有以下几种:
    • TUMBLE:滚动窗口
    • HOP:滑动窗口
    • SESSION:会话窗口
    例如,统计过去的1分钟内,多少用户单击了某个网页。在这种情况下,我们可以定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
    • 测试数据
      username (VARCHAR) click_url (VARCHAR) ts (TIMESTAMP)
      Jark http://taobao.com/xxx 2017-10-10 10:00:00.0
      Jark http://taobao.com/xxx 2017-10-10 10:00:10.0
      Jark http://taobao.com/xxx 2017-10-10 10:00:49.0
      Jark http://taobao.com/xxx 2017-10-10 10:01:05.0
      Jark http://taobao.com/xxx 2017-10-10 10:01:58.0
      Timo http://taobao.com/xxx 2017-10-10 10:02:10.0
      INSERT INTO tumble_output
      SELECT
      TUMBLE_START(ts, INTERVAL '1' MINUTE),
      TUMBLE_END(ts, INTERVAL '1' MINUTE),
      username,
      COUNT(click_url)
      FROM window_input
      GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
    • 测试结果
      window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT)
      2017-10-10 10:00:00.0 2017-10-10 10:01:00.0 Jark 3
      2017-10-10 10:01:00.0 2017-10-10 10:02:00.0 Jark 2
      2017-10-10 10:02:00.0 2017-10-10 10:03:00.0 Timo 1
  • 无窗口函数Group Aggregation
    Group Aggregation是一种无限流的聚合。因为没有窗口,所以每到达一条数据就会增量计算一次,并发出更新后的结果。实时计算Flink版状态保存在以下三层存储媒介:
    • 第一层:内存,作为缓存来使用。
    • 第二层:硬盘,实时计算使用SSD作为存储。
    • 第三层:HDFS。
    因此,不存在内存占满的问题。
    • 测试数据
      Customer OrderPrice
      Bush 1000
      Carter 1600
      Bush 700
      Bush 300
      Adams 2000
      Carter 100
      SELECT Customer,SUM(OrderPrice) FROM XXXX
      GROUP BY Customer;
    • 测试结果
      Customer SUM(OrderPrice)
      Bush 2000
      Carter 1700
      Adams 2000