支持流式入库的系统都基本遵循了一个思路,流式数据按照小批量数据写小文件到存储系统,然后定时合并这些文件。例如,Hive和Delta Lake。Kudu也支持流式入库,但是Kudu的存储是自己设计的,不属于基于大数据存储系统之上的解决方案。本文以Kafka数据源为例介绍,其余数据源根据控制台提示操作即可。

流式入库演变

阶段详细情况
以前以前针对流式入库的需求,通常都是自己动手,事实表按照时间划分Partition,粒度比较细。例如,五分钟一个Partition,每当一个Partition运行完成,触发一个INSERT OVERWRITE动作,合并该Partition内的文件重新写入分区。但是这么做有以下几个问题:
  • 缺少读写隔离,易造成读端失败或者产生数据准确性问题。
  • 流式作业没有Exactly-Once保证,入库作业失败后需要人工介入,确保数据不会写重或者写漏(如果是SparkStreaming,有At-Least-Once保证)。
Hive从0.13版本提供了事务支持,并且从2.0版本开始提供了Hive Streaming功能来实现流式入库的支持。但是在实际使用Hive Streaming功能的案例并不多见。其主要原因如下:
  • Hive事务的实现修改了底层文件,导致公共的存储格式等仅能够被Hive读取,导致很多使用SparkSQL、Presto等进行数据分析的用户无法使用该功能。
  • Hive事务目前仅支持ORC。
  • Hive的模式为Merge-on-read,需要对小文件进行Sort-Merge。小文件数量增多之后读性能急剧下降,所以用户需要及时进行小文件的合并。而小文件的合并作业经常失败,影响用户业务效率。
  • Hive这种模式无法拓展到Data Lake场景,仅仅停留在Data Warehouse场景。在Data Lake场景中,数据来源以及数据需求都是多样性的。
现在有了Delta,可以很方便地应对流式入库的场景。只需要以下四个动作:
  1. 建表。
  2. 启动Spark Streaming任务写入数据。
  3. 定时Optimize(例如:每个Partition写入完成)。
  4. 定时Vacuum(例如:每天)。

Delta实例展示

从上游Kafka中读取数据,写入Delta表。上游Kafka准备一个Python脚本,不断向Kafka内发送数据。

#! /usr/bin/env python3

import json
import time

from kafka import KafkaProducer
from kafka.errors import KafkaError

bootstrap = ['emr-header-1:9092']
topic = 'delta_stream_sample'

def gnerator():
    id = 0
    line = {}
    while True:
        line['id'] = id
        line['date'] = '2019-11-11'
        line['name'] = 'Robert'
        line['sales'] = 123
        yield line
        id = id + 1

def sendToKafka():
    producer = KafkaProducer(bootstrap_servers=bootstrap)

    for line in gnerator():
        data = json.dumps(line).encode('utf-8')

        # Asynchronous by default
        future = producer.send(topic, data)

        # Block for 'synchronous' sends
        try:
            record_metadata = future.get(timeout=10)
        except KafkaError as e:
            # Decide what to do if produce request failed
            pass
        time.sleep(0.1)

sendToKafka()
为了方便,数据只有id不一样。
{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}

启动一个Spark Streaming作业,从Kafka读数据,写入Delta表。

  • Scala
    • bash
       spark-shell --master local --use-emr-datasource
    • scala
      import org.apache.spark.sql.{functions, SparkSession}
      import org.apache.spark.sql.types.DataTypes
      import org.apache.spark.sql.types.StructField
      
      val targetDir = "/tmp/delta_table"
      val checkpointLocation = "/tmp/delta_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "delta_stream_sample"
      
      val schema = DataTypes.createStructType(Array[StructField](
        DataTypes.createStructField("id", DataTypes.LongType, false),
        DataTypes.createStructField("date", DataTypes.DateType, false),
        DataTypes.createStructField("name", DataTypes.StringType, false),
        DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
      val lines = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option("subscribe", topic)
          .option("maxOffsetsPerTrigger", 1000)
          .option("startingOffsets", "earliest")
          .option("failOnDataLoss", value = false)
          .load()
          .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
          .select("json.*")
      
      val query = lines.writeStream
          .outputMode("append")
          .format("delta")
          .option("checkpointLocation", checkpointLocation)
          .start(targetDir)
      
      query.awaitTermination()
  • SQL
    • bash
      streaming-sql --master local --use-emr-datasource
    • SQL
      CREATE TABLE IF NOT EXISTS kafka_table
      USING kafka
      OPTIONS(
      kafka.bootstrap.servers='192.168.XX.XX:9092',
      subscribe='delta_stream_sample'
      );
      
      CREATE TABLE IF NOT EXISTS delta_table (id LONG, `date` DATE, name STRING, sales STRING)
      USING delta
      LOCATION '/tmp/delta_table';
      
      CREATE SCAN stream_kafka_table on kafka_table USING STREAM
      OPTIONS(
      maxOffsetsPerTrigger='1000',
      startingOffsets='earliest',
      failOnDataLoss=false
      );
      
      CREATE STREAM job
      OPTIONS(
      checkpointLocation='/tmp/delta_table_checkpoint'
      )
      INSERT INTO delta_table
      SELECT
          content.id as id,
          content.date as date,
          content.name as name,
          content.sales as sales
      FROM (
          SELECT from_json(CAST(value as STRING), 'id LONG, `date` DATE, name STRING, sales STRING') as content
          FROM stream_kafka_table
      );

另新建一个spark-shell,确认已经读到数据。

  • Scala
    val df = spark.read.format("delta").load("/tmp/delta_table")
    df.select("*").orderBy("id").show(10000)
  • SQL
    SELECT * FROM delta_table ORDER BY id LIMIT 10000;

现在已经写入了2285条数据。

|2295|2019-11-11|Robert|  123|
|2296|2019-11-11|Robert|  123|
|2297|2019-11-11|Robert|  123|
|2275|2019-11-11|Robert|  123|
|2276|2019-11-11|Robert|  123|
|2277|2019-11-11|Robert|  123|
|2278|2019-11-11|Robert|  123|
|2279|2019-11-11|Robert|  123|
|2280|2019-11-11|Robert|  123|
|2281|2019-11-11|Robert|  123|
|2282|2019-11-11|Robert|  123|
|2283|2019-11-11|Robert|  123|
|2284|2019-11-11|Robert|  123|
|2285|2019-11-11|Robert|  123|
+----+----------+------+-----+

Exactly-Once测试

停掉Spark Streaming作业,再重新启动。重新读一下表,读数据正常的话,数据能够从上次断掉的地方衔接上。

  • Scala
    df.select("*").orderBy("id").show(10000)
  • SQL
    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|