本文汇总了Flink全托管常见问题与解决方案。

如何解决Flink依赖冲突问题?

  • 问题现象
    • 有明显报错,且引发错误的为Flink或Hadoop相关类。
      java.lang.AbstractMethodError
      java.lang.ClassNotFoundException
      java.lang.IllegalAccessError
      java.lang.IllegalAccessException
      java.lang.InstantiationError
      java.lang.InstantiationException
      java.lang.InvocationTargetException
      java.lang.NoClassDefFoundError
      java.lang.NoSuchFieldError
      java.lang.NoSuchFieldException
      java.lang.NoSuchMethodError
      java.lang.NoSuchMethodException
    • 无明显报错,但会引起一些不符合预期的现象,例如:
      • 日志不输出或log4j配置不生效。
        该类问题通常是由于依赖中携带了log4j相关配置导致的。需要排查作业JAR包中是否引入了log4j配置的依赖,可以通过在dependency中配置exclusions的方式去掉log4j配置。
        说明 如果必须要使用不同版本的log4j,需要使用maven-shade-plugin将log4j相关的class shade掉。
      • RPC调用异常。

        Flink的Akka RPC调用出现依赖冲突可能导致的异常,默认不会显示在日志中,需要开启Debug日志进行确认。

        例如,Debug日志中出现Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx},但是JM日志在Registering TaskManager with ResourceID xxx后,没有下文,直到资源请求超时报错NoResourceAvailableException。此外TM持续报错Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}

        原因:开启DEBUG日志后,发现RPC调用报错InvocationTargetException,该报错导致TM Slot分配到一半失败出现状态不一致,RM持续尝试分配Slot失败无法恢复。

  • 问题原因
    • 作业JAR包中包含了不必要的依赖(例如基本配置、Fllink、Hadoop和Log4j依赖),造成依赖冲突从而引发各种问题。
    • 作业需要的Connector对应的依赖未被打入JAR包中。
  • 排查方法
    • 查看作业pom.xml文件,判断是否存在不必要的依赖。
    • 通过jar tf foo.jar命令查看作业JAR包内容,判断是否存在引发依赖冲突的内容。
    • 通过mvn dependency:tree命令查看作业的依赖关系,判断是否存在冲突的依赖。
  • 解决方案
    • 基本配置建议将scope全部设置为provided,即不打入作业JAR包。
      • DataStream Java
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataStream Scala
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Java
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Scala
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    • 添加作业需要的Connector对应的依赖,并将scope全部设置为compile(默认的scope是compile),即打入作业JAR包中,以Kafka Connector为例,代码如下。
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
    • 其他Flink、Hadoop和Log4j依赖不建议添加。但是:
      • 如果作业本身存在基本配置或Connector相关的直接依赖,建议将scope设置为provided,示例如下。
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <scope>provided</scope>
        </dependency>
      • 如果作业存在基本配置或Connector相关的间接依赖,建议通过exclusion将依赖去掉,示例如下。
        <dependency>
            <groupId>foo</groupId>
              <artifactId>bar</artifactId>
              <exclusions>
                <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
               </exclusion>
            </exclusions>
        </dependency>

Python作业,如果Checkpoint慢怎么办?

  • 问题原因

    Python算子内部有一定的缓存,在进行Checkpoint时,需要将缓存中的数据全部处理完。因此,如果Python UDF的性能较差,则会导致Checkpoint时间变长,从而影响作业执行。

  • 解决方案
    在作业开发页面单击右侧的高级配置后,在更多Flink配置项中,设置以下参数,将缓存调小。
    python.fn-execution.bundle.size:默认值为100000,单位是条数。
    python.fn-execution.bundle.time:默认值为1000,单位是毫秒。
    参数的详细信息请参见Flink Python配置

Flink全托管集群如何访问公网?

  • 背景说明
    Flink全托管集群默认不具备访问公网的能力,但阿里云提供的NAT网关可以实现VPC网络与公网网络互通,以满足部分Flink全托管集群用户通过UDX或Datastream代码访问公网的需求。背景说明
  • 解决方案
    通过在VPC中创建NAT网关,并创建SNAT条目,将Flink全托管集群所在的交换机绑定至弹性公网IP(EIP),即可通过EIP访问公网。具体配置方法请参见:

如何访问跨VPC里的存储资源?

您可以通过以下几种方式跨VPC访问存储资源:
  • 提交工单,产品名称选择VPC,要求通过高速通道或其它产品打通网络,但是此种方式需要付费。
  • 使用VPN网关建立VPC到VPC的VPN连接,详情请参见建立VPC到VPC的连接
  • 退掉和Flink全托管不同VPC的存储服务后,重新购买一个与Flink全托管相同VPC的存储服务。
  • 释放Flink全托管服务后,重新购买一个和存储服务相同VPC的Flink全托管服务。
  • 开通Flink全托管的公网访问能力,通过公网访问存储服务。Flink全托管集群默认不具备访问公网的能力,如有需求,详情请参见Flink全托管集群如何访问公网?
    说明 因为在延迟性方面,公网不如内网,所以不推荐使用此方式。

如何查找引发告警的作业?

告警事件中包含JobID和DeploymentID信息,但是由于作业Failover后导致JobID发生变化,因此您需要通过Deployment ID查找是哪个作业报错。目前,Flink全托管控制台上暂无Deployment ID信息,您需要在作业的URL中获取Deployment ID信息。Deployment ID

如何查看工作空间ID等信息?

在管理控制台目标工作空间名称右侧,选择其他 > 工作空间详情工作空间详情

如何在OSS控制台上传JAR包?

  1. 在Flink全托管控制台上查看当前集群的OSS Bucket。OSS Bucket
    OSS Bucket信息如下图所示。Bucket详情
  2. 登录OSS控制台,上传JAR包至目标Bucket的/artifats/namespaces目录下。jar目录
  3. 在Flink全托管控制台左侧导航栏,单击资源上传,查看通过OSS控制台上传的JAR包。查看jar包

SQL作业如何配置Rocksdb Statebackend?

在目标作业详情页面右上角,单击编辑后,在其他配置中,添加如下两行代码后保存生效。
state.backend: rocksdb 
state.backend.incremental: true
详情

报错:undefined

  • 报错详情报错信息
  • 报错原因

    您的JAR包较大。

  • 解决方案

    您可以在OSS 管理控制台上传JAR包。

报错:OSSException

  • 报错详情
    新作业启动或已上线作业重启时报错。报错
  • 报错原因

    对象存储OSS开启了版本控制,导致存在大量的Object。

  • 解决方案
    1. OSS管理控制台,关闭版本控制,详情请参见版本控制
    2. oss://{bucket}/flink-jobs/namespaces/{namespace}/jobs/下的Object全部删除。
      ./ossutil64 rm oss://{bucket}/flink-jobs/namespaces/{namespace}/jobs/ --all-versions -r
      说明
      • 本文以ossutil,您也可以使用其他方式删除Object。
      • 命令详情参见rm
    3. 重启Flink全托管作业。

报错:exceeded quota: resourcequota

  • 报错详情
    作业启动过程中报错。报错
  • 报错原因

    当前项目空间资源不足导致作业启动失败。

  • 解决方案

    您需要对项目资源进行资源变配,详情请参见单项目资源变配

INFO:org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss

  • 报错详情报错详情
  • 报错原因

    OSS每次创建新目录时,会先检查是否存在该目录,如果不存在,就会报这个INFO信息,但该INFO信息不影响Flink作业运行。

  • 解决方案

    在日志模板中添加<Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>。详情请参见配置作业日志