文档

表实用程序命令

更新时间:
一键部署
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

Delta 表支持许多实用程序命令。

说明

详细文章请参考Databricks官网文章:表实用程序命令

有关演示这些功能的Databricks笔记本,请参阅入门笔记本二

删除Delta表不再引用的文件

您可以通过在表上运行vacuum命令来删除Delta表不再引用且早于保留阈值的文件。vacuum 不会自动触发。文件的默认保留阈值为7天。

警告

  • vacuum仅删除数据文件,而不删除日志文件。检查点操作后,日志文件将自动异步删除。日志文件的默认保留期为30天,可通过使用ALTER TABLE SET TBLPROPERTIES SQL方法设置的delta.logRetentionPeriod属性进行配置。请参阅表属性。

  • 运行vacuum后,无法再按时间顺序查看在保留期之前创建的版本。

SQL

%sql
VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period

VACUUM '/data/events' -- vacuum files in path-based table

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

Python

%pyspark
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

Scala

%spark
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

有关语法的详细信息,请参见

  • Databricks Runtime 7.0及更高版本:VACUUM

  • Databricks Runtime 6.x及以下:VACUUM

  • 有关Scala,Java和Python语法的详细信息,请参见API参考

警告

我们不建议您将保留间隔设置为短于7天,因为并发的读取器或写入器仍然可以将旧快照和未提交的文件用于表。如果 vacuum清除活动文件,则并发阅读器可能会失败,或者更糟的是,当vacuum删除尚未提交的文件时,表可能会损坏。

Delta Lake具有一项安全检查,以防止您执行危险的vacuum命令。如果您确定在此表上执行的操作没有超过计划指定的保留时间间隔,你可以通过设置ApacheSpark属性spark.databricks.delta.retentionDurationCheck.enabled设置为false来关闭此安全检查。选择的时间间隔,必须比最长的并发事务长,也必须比任何流可以滞后于表的最新更新的最长时间长。

检索Delta表历史记录

您可以通过运行history命令来检索每次写入Delta表的操作、用户、时间戳等信息。以相反的时间顺序返回操作。默认情况下,表历史记录会保留30天。

SQL

%sql
DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable
查看历史表

Python

%pyspark
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

Scala

%spark
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

有关Spark SQL语法的详细信息,请参见

有关Scala,Java和Python语法的详细信息,请参见API参考

历史架构

类型

说明

版本

long

通过操作生成的表版本

timestamp

timestamp

提交此版本的时间

userId

字符串

运行操作的用户的ID

userName

字符串

运行操作的用户的姓名。

operation

字符串

操作的名称。

operationParameters

map

操作的参数(例如谓词。)

作业(job)

struct

运行操作的作业的详细信息。

笔记本

struct

运行操作的笔记本的详细信息。

clusterId

字符串

运行操作的集群的 ID。

readVersion

long

读取以执行写入操作的表的版本。

isolationLevel

字符串

用于此操作的隔离级别。

isBlindAppend

boolean

此操作是否追加数据。

operationMetrics

map

操作的指标(例如已修改的行数和文件数。)

userMetadata

字符串

用户定义的提交元数据(如果已指定)

该history操作的输出包含以下列。

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
说明
  • 仅当使用 Databricks Runtime 6.5 或更高版本运行历史记录中的 history 命令和操作时,操作指标才可用。

  • 如果使用以下方法写入Delta表,则其他一些列将不可用:

    • JDBC或ODBC

    • JAR工作

    • Spark提交工具

    • 使用REST API运行命令

  • 将来添加的列将始终添加在最后一列之后。

操作指标键

该history操作在operationMetrics列映射中返回操作指标的集合。

下表按操作列出了映射键定义。

操作方式

指标名称

描述

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

numFiles

写入的文件数。

numOutputBytes

已写入的内容的大小(以字节为单位)。

numOutputRows

写入的行数。

STREAMING UPDATE

numAddedFiles

添加的文件数。

numRemovedFiles

删除的文件数。

numOutputRows

写入的行数。

numOutputBytes

写入大小(以字节为单位)。

DELETE

numAddedFiles

添加的文件数。 删除表的分区时未提供。

numRemovedFiles

删除的文件数。

numDeletedRows

删除的行数。 删除表的分区时未提供。

numCopiedRows

在删除文件期间复制的行数。

TRUNCATE

numRemovedFiles

删除的文件数。

MERGE

numSourceRows

源数据帧中的行数。

numTargetRowsInserted

插入到目标表的行数。

numTargetRowsUpdated

目标表中更新的行数。

numTargetRowsDeleted

目标表中删除的行数。

numTargetRowsCopied

复制的目标行数。

numOutputRows

写出的总行数。

numTargetFilesAdded

添加到接收器(目标)的文件数。

numTargetFilesRemoved

从接收器(目标)删除的文件数。

UPDATE

numAddedFiles

添加的文件数

numRemovedFiles

删除的文件数。

numUpdatedRows

更新的行数。

numCopiedRows

刚才在更新文件期间复制的行数。

FSCK

numRemovedFiles

删除的文件数。

CONVERT

numConvertedFiles

已转换的 Parquet 文件数。

操作方式

指标名称

描述

CLONE

sourceTableSize

所克隆版本的源表的大小(以字节为单位)。

sourceNumOfFiles

源表中已克隆版本的文件数。

numRemovedFiles

目标表中删除的文件数(如果替换了先前的 Delta 表)。

removedFilesSize

如果替换了先前的 Delta 表,则为目标表中删除文件的总大小(以字节为单位)。

numCopiedFiles

复制到新位置的文件数。 如果是浅表克隆,则为 0。

copiedFilesSize

复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。

RESTORE

tableSizeAfterRestore

还原后的表大小(以字节为单位)。

numOfFilesAfterRestore

还原后表中的文件数。

numRemovedFiles

还原操作删除的文件数。

numRestoredFiles

由于还原而添加的文件数。

removedFilesSize

还原删除的文件的大小(以字节为单位)。

restoredFilesSize

还原添加的文件的大小(以字节为单位)。

OPTIMIZE

numAddedFiles

添加的文件数。

numRemovedFiles

优化的文件数。

numAddedBytes

优化表后添加的字节数。

numRemovedBytes

删除的字节数。删除的字节数。

minFileSize

优化表后最小文件的大小。

p25FileSize

优化表后第 25 个百分位文件的大小。

p50FileSize

优化表后的文件大小中值。

p75FileSize

优化表后第 75 个百分位文件的大小。

maxFileSize

优化表后最大文件的大小。

  • 需要Databricks Runtime 7.3或更高版本。

  • 需要Databricks Runtime 7.4或更高版本。

检索Delta表详细信息

可以使用“描述详细信息”检索有关增量表的详细信息(例如,文件数、数据大小)。

SQL

%sql
DESCRIBE DETAIL '/data/events/'

DESCRIBE DETAIL eventsTable

详细架构

此操作的输出只有一行具有以下架构。

类型

说明

format

字符串

表的格式,即“delta”。

id

字符串

表的唯一 ID。

name

字符串

在元存储中定义的表名称。

description

字符串

表的说明。

location

字符串

表的位置。

createdAt

timestamp

表创建时间。

lastModified

timestamp

表的上次修改时间。

partitionColumns

字符串数组

如果表已分区,则为分区列的名称。

numFiles

long

表最新版本中的文件数。

properties

string-string map

此表的所有属性集。

minReaderVersion

int

可读取表的读取器最低版本(由日志协议而定)。

minWriterVersion

int

可写入表的写入器最低版本(由日志协议而定)。

+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable|       null|file:/Users/tdas/...|2020-06-05 12:20:...|2020-06-05 12:20:20|              []|      10|      12345|        []|               1|               2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

生成清单文件

您可以为Delta表生成清单文件,供其他处理引擎(即Apache Spark以外的其他引擎)用来读取Delta表。例如,要生成清单文件,Presto和Athena可以使用它们来读取Delta表,请运行以下命令:

SQL

%sql
GENERATE symlink_format_manifest FOR TABLE delta.`/mnt/events`

GENERATE symlink_format_manifest FOR TABLE eventsTable

Python

%pyspark
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Scala

%spark
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

将Parquet表转换为Delta表

就地将Parquet表转换为Delta表。此命令会列出目录中的所有文件,创建一个Delta Lake事务日志来跟踪这些文件,并通过读取所有Parquet文件的页脚自动推断数据架构。如果您的数据已分区,则必须将分区列的架构指定为DDL格式的字符串(即)。<column-name1> <type>, <column-name2> <type>, ...

SQL

%SQL
-- Convert unpartitioned parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`

-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)
将parquet文件转换为delta

Python

%pyspark
from delta.tables import *

# Convert unpartitioned parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")
重要

可在 Databricks Runtime 6.1 及更高版本中使用 Python API。

Scala

%spark
import io.delta.tables._

// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")
重要

可在 Databricks Runtime 6.0 及更高版本中使用 Scala API。

有关语法的详细信息,请参见

重要

Delta Lake跟踪的文件都是不可见的,运行vacuum时可以删除。您勿在转换过程中更新或附加数据文件。转换表后,请确保通过Delta Lake写入所有文件。

将Delta表转换为Parquet表

您可以使用以下步骤轻松地将Delta表转换回Parquet表:

  1. 如果执行了可以更改数据文件的Delta Lake操作(例如delete或merge),请运行vacuum并将保留期限设为0小时,从而删除表的最新版本中未包含的所有数据文件。

  2. 删除目录中的_delta_log目录。

将Delta表还原到较早的状态

说明

此功能目前以 公共预览版提供。

重要

在Databricks Runtime 7.4及更高版本中可用。

您可以使用以下RESTORE命令将Delta表还原到其早期状态。Delta表在内部维护该表的历史版本,以使其能够还原到较早的状态。RESTORE命令支持与早期状态相对应的版本或创建早期状态的时间戳。

警告

  • 您可以还原已经还原的表和克隆的表。

  • 将表还原到手动或删除数据文件的旧版本vacuum将失败。如果spark.sql.files.ignoreMissingFiles设置为true,仍然可以部分还原到该版本。

  • 恢复到较早状态的时间戳格式为yyyy-MM-dd HH:mm:ssyyyy-MM-dd。还支持仅提供date()字符串。

SQL

%sql
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Python

%pyspark
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, <path-to-table>)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, <table-name>)    # Hive metastore-based tables

deltaTable.restoreToVersion(0) # restore table to oldest version

deltaTable.restoreToTimestamp('2019-02-14') # restore to a specific timestamp

Scala

%spark
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

有关语法的详细信息,请参见RESTORE(Databricks上的Delta Lake)

表访问控制

您必须MODIFY对要还原的表具有权限

克隆 Delta 表

说明

此功能目前以 公共预览版提供。

重要

在Databricks Runtime 7.2及更高版本中可用。

您可以使用以下clone命令以特定版本创建现有Delta表的副本。克隆可以是深层或浅层。

  • 深层克隆是指除了现有表的元数据外,还会将源表数据复制到克隆目标的克隆。 此外,它还会克隆流元数据,使写入 Delta 表的流可在源表上停止,并在克隆的目标位置(即停止位置)继续进行克隆。

  • 浅表克隆不会将数据文件复制到克隆目标。 表元数据等效于源。 创建这些克隆的成本较低。

对深层克隆或浅层克隆所做的任何更改都只会影响克隆本身,而不会影响源表。

克隆的元数据包括:架构,分区信息,不变量,可为Null性。对于深层克隆,还将克隆流和COPY INTO(Databricks上的Delta Lake)元数据。未克隆的元数据是表描述和用户定义的提交元数据。

警告

  • 浅克隆引用源目录中的数据文件。如果在源表上运行vacuum,客户端将无法再读取引用的数据文件,并将引发FileNotFoundException。在这种情况下,在浅层克隆上运行clone with replace将修复克隆。如果经常发生这种情况,请考虑使用不依赖于源表的深层克隆。

  • 深度克隆不依赖于其克隆来源,因为深度克隆会复制数据以及元数据,所以创建深度克隆的成本很高。

  • 使用 replace 克隆到已在该路径具有表的目标时,如果该路径不存在,会创建一个 Delta 日志。您可以通过vacuum运行清理任何现有数据。如果现有表是Delta表,则会在现有Delta表上创建新的提交,其中包括源表中的新元数据和新数据。

  • 克隆表与 Create Table As Select 或 CTAS 不同,除了数据之外,克隆还复制源表的元数据。克隆的语法更为简单:不需要指定分区、格式、不变量、可Null性等,因为它们取自源表。

  • 克隆表与其具有源表无关的历史记录。在克隆表上的按时间查询时,这些查询使用的输入与它们在其源表上查询时使用的不同。

SQL

%sql
CREATE TABLE delta.`/data/target/` CLONE delta.`/data/source/` -- Create a deep clone of /data/source at /data/target

 CREATE OR REPLACE TABLE db.target_table CLONE db.source_table -- Replace the target

 CREATE TABLE IF NOT EXISTS TABLE delta.`/data/target/` CLONE db.source_table -- No-op if the target table exists

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)

Python

%pyspark
from delta.tables import *

 deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
 deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

 deltaTable.clone(target, isShallow, replace) # clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) # clone the source at a specific version

# clone the source at a specific timestamp such as timestamp=“2019-01-01”
 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace)

Scala

%spark
import io.delta.tables._

 val deltaTable = DeltaTable.forPath(spark, pathToTable)
 val deltaTable = DeltaTable.forName(spark, tableName)

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp

有关语法的详细信息,请参见CLONE(Databricks上的Delta Lake)

权限

您必须CLONEDatabricks表访问控制和您的云提供的所需的权限。

表访问控制

深层和浅层克隆都需要具有以下权限:

  • 源表上的SELECT权限

  • 如果要CLONE用于创建新表,请对创建表的数据库具有CREATE权限。

  • 如果要CLONE用来替换表,则必须具有该表的MODIFY权限。

云权限

如果创建了深度克隆,则任何读取该深度克隆的用户都必须具有对该克隆目录的读取权限。要更改克隆,用户必须具有对克隆目录的写入权限。

如果创建了浅表克隆,则读取浅表克隆的任何用户都需要权限才能读取原始表中的文件,因为数据文件保留在源表中,并且包含浅表克隆以及该表的目录。若要更改克隆,用户需要对克隆目录具有写入权限。

克隆用例

  • 数据存档:数据保存的时间可能会比按时间查看或灾难恢复的时间更长。在这些情况下,您可以创建一个深层克隆,保留表的某个时间点的状态以供存档。还可以通过增量存档保留源表的持续更新状态,以进行灾难恢复。

    SQL

    %sql
    -- Every month run
    CREATE OR REPLACE TABLE delta.`/some/archive/path` CLONE my_prod_table

  • 机器学习流重现:在进行机器学习时,你可能希望将已训练 ML 模型的表的特定版本进行存档。可以使用此存档数据集测试将来的模型。

    SQL

    %sql
    -- Trained model on version 15 of Delta table
    CREATE TABLE delta.`/model/dataset` CLONE entire_dataset VERSION AS OF 15

  • 在生产表上进行短期实验:为了在不损坏表的情况下测试生产表中的工作流,可以轻松创建浅表克隆。这样,就可在包含所有生产数据的克隆表上运行任意工作流,而不会影响任何生产工作负载。

    SQL

    %sql
    -- Perform shallow clone
    CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;
    
    UPDATE my_test WHERE user_id is null SET invalid=true;
    -- Run a bunch of validations. Once happy:
    
    -- This should leverage the update information in the clone to prune to only
    -- changed files in the clone if possible
    MERGE INTO my_prod_table
    USING my_test
    ON my_test.user_id <=> my_prod_table.user_id
    WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;
    
    DROP TABLE my_test;

  • 数据共享:单个组织内的其他业务部门可能希望访问上述的数据,但可能不需要最新更新。可以为不同的业务部门提供具有不同权限的克隆,而不是直接授予对源表的访问权限。克隆的性能比简单视图的性能更高。

    SQL

    %sql
    -- Perform deep clone
    CREATE OR REPLACE TABLE shared_table CLONE my_prod_table;
    
    -- Grant other users access to the shared table
    GRANT SELECT ON shared_table TO `<user-name>@<user-domain>.com`;

  • 本页导读 (0)
文档反馈