文档

配置作业日志输出

更新时间:

除直接在Flink控制台作业探查页面查看作业日志外,您还可以将作业日志配置输出至外部存储(OSS、SLS或Kafka)后进行查看。本文为您介绍如何配置作业日志输出,配置后您可以在对应存储中查看作业日志。

注意事项

  • 配置日志到其他存储后,如果未关闭日志归档功能,购买工作空间时配置的OSS仍会持续保存日志。关闭后,Flink控制台页面也将无法查看作业日志。

    image

  • 配置日志输出至OSS、SLS或Kafka后,需要重启作业。

  • 您可以在日志配置中使用${secret_values.xxxx},引用已经在密钥托管中设置的变量,详情请参见密钥管理

配置单个作业日志输出

  1. 进入配置单个作业日志输出入口。

    1. 登录实时计算管理控制台

    2. Flink全托管页签,单击目标工作空间操作列下的控制台

    3. 在左侧导航栏上,单击作业运维,单击目标作业名称。

    4. 部署详情页签,单击日志配置区域右侧的编辑

    5. 日志模板选择为自定义模板

  2. 配置日志输出信息。

    按照存储对象,将对应配置信息复制粘贴到输入框中,并修改指定参数取值为您目标存储信息。

    配置到OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    参数

    说明

    YOUR-BUCKET-NAME

    替换成您OSS Bucket名称。

    YOUR-ENDPOINT

    替换成您OSS的Endpoint,详情请参见访问域名和数据中心

    Endpoint为ECS的VPC网络访问(内网) 所在行的Endpoint(地域节点)信息。

    YOUR-OSS-ACCESSKEYID

    替换成您配置OSS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey

    为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见密钥管理

    说明

    仅当您配置输出到与Flink全托管不同账号下的OSS时,该参数必填。相同账号时无需填写,请删除该参数。

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。

    flushIntervalEventCount

    日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。

    说明

    与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。

    rollingBytes

    OSS中单个日志的大小。达到最大值后,后续写入的数据会写入一个新的日志文件。

    配置到SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    说明

    代码中的namespace、deploymentId、jobId、deploymentName为Twig变量,您无需修改,修改后会导致作业启动时报错。

    参数

    说明

    YOUR-SLS-PROJECT

    替换成您SLS的Project名称。

    YOUR-SLS-LOGSTORE

    替换成您SLS的Logstore名称。

    YOUR-SLS-ENDPOINT

    替换成您SLS所在地域的私网Endpoint,详情请参见服务入口

    YOUR-SLS-ACCESSKEYID

    替换成您配置SLS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey

    为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见密钥管理

    说明

    如果您配置的SLS和Flink全托管服务不在同一账号时,需要给SLS服务账号配置Flink全托管账号可以写入SLS的权限,具体操作详情请参见创建自定义权限策略,具体的策略内容信息如下:

    • 不限制SLS范围

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • 指定SLS资源范围,示例如下。

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。

    flushIntervalEventCount

    日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。

    说明

    与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。

    配置到Kafka

    说明

    暂不支持开启Kerberos认证的Kafka集群。

    • 前提条件

      实时计算提供的KafkaAppender日志插件是通过Flink的插件类加载器加载,使用前需要显示指定KafkaAppender日志插件所在的包路径,使得Flink应用可以成功加载到该插件。具体的操作方式如下:

      • 配置作业模板(对该项目空间下所有作业生效)

        在Flink开发控制台配置管理页面的其他配置中,添加如下代码。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • 配置单个作业(仅对当前作业生效)

        作业运维页面,单击目标作业名称,在部署详情页签的运行参数配置区域的其他配置中,添加如下代码。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • 日志配置

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%date %message"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      参数

      说明

      YOUR-TOPIC-NAME

      写入的Kafka Topic名称。

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      写入的Kafka Broker地址。

      YOUR-ACKS-VALUE

      指定了必须有多少个分区副本收到消息,Producer才会认为消息写入是成功的。具体取值请参见acks

      YOUR-BUFFER-MEMORY-SIZE

      Producer缓冲区大小。单位Byte。

      YOUR-RETRIES-NUMBER

      发送失败的重试次数。

      YOUR-COMPRESSION-TYPE

      Producer生成数据时可使用的压缩类型,包括none、gzip、snappy、lz4或zstd。

      说明

      您还可以设置所有Apache Kafka client支持的配置参数,详情请参见Apache Kafka

  3. 单击保存

  4. 单击页面顶部的启动

配置工作空间下所有作业日志输出

您可以通过配置模板的方式,配置工作空间下所有作业日志默认输出到OSS、SLS或Kafka。

重要

配置后,后续该工作空间下创建的所有作业的日志都会被存储到OSS、SLS或Kafka。

  1. 进入到作业日志模板配置入口。

    1. 登录实时计算控制台

    2. Flink全托管页签,单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击配置管理

    4. 作业默认配置页签,选择作业类型。

    5. 日志配置区域,日志模板选择为自定义模板

    6. 参考配置单个作业日志输出,配置项目空间下所有作业的日志输出信息。

  2. 单击保存更改

相关文档

  • 本页导读 (1)
文档反馈