在 Spark 中使用增量表
可以使用增量表(或增量格式文件)以多种方式检索和修改数据。
使用 Spark SQL
在 Spark 中处理增量表中数据的最常见方法是使用 Spark SQL。 可以使用 spark.sql 库以其他语言(如 PySpark 或 Scala)嵌入 SQL 语句。 例如,以下代码在 products 表中插入一行。
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
或者,可以在笔记本中使用 %%sql
magic 来运行 SQL 语句。
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
使用 Delta API
如果要使用增量文件而不是目录表,使用 Delta Lake API 可能更简单。 可以从包含增量格式文件的文件夹位置创建 DeltaTable 的实例,然后使用 API 修改表中的数据。
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
使用“按时间顺序查看”来处理表的版本控制
对增量表进行的修改记录在表的事务日志中。 可以使用记录的事务来查看对表所做的更改的历史记录,并检索旧版数据(称为“按时间顺序查看”)
若要查看表的历史记录,可以使用 SQL 命令 DESCRIBE
,如下所示。
%%sql
DESCRIBE HISTORY products
此语句的结果显示已应用于表的事务,如下所示(某些列已省略):
版本 | timestamp | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | WRITE | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
要查看外部表的历史记录,可以指定文件夹位置,而不是表名。
%%sql
DESCRIBE HISTORY 'Files/mytable'
可以通过将增量文件位置读入数据帧来从特定版本的数据中检索数据,并将所需的版本指定为 versionAsOf
选项:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
或者,可以使用 timestampAsOf
选项指定时间戳:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)