更新 Delta Lake 表架构
Delta Lake 允许你更新表的架构。 支持下列类型的更改:
- 添加新列(在任意位置)
- 重新排列现有列
- 重命名现有列
你可以使用 DDL 显式地或使用 DML 隐式地进行这些更改。
重要
对 Delta 表架构的更新是与所有并发 Delta 写入操作冲突的操作。
更新 Delta 表架构时,从该表进行读取的流会终止。 如果你希望流继续进行,必须重启它。 有关建议的方法,请参阅结构化流式处理的生产注意事项。
显式更新架构以添加列
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,为 Null 性为 true
。
若要将列添加到嵌套字段,请使用:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
例如,如果运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注意
仅支持为结构添加嵌套列。 不支持数组和映射。
显式更新架构以更改列注释或排序
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
若要更改嵌套字段中的列,请使用:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
例如,如果运行 ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field2
| +-field1
显式更新架构以替换列
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例如,运行以下 DDL 时:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
如果运行之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
显式更新架构以重命名列
注意
此功能在 Databricks Runtime 10.4 LTS 和更高版本中可用。
若要在不重写任何列现有数据的情况下重命名列,则必须启用表的列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
重命名列:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
重命名嵌套字段:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
例如,运行以下命令时:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
如果运行之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field001
| +-field2
显式更新架构以删除列
注意
此功能在 Databricks Runtime 11.3 LTS 和更高版本中可用。
要在不重写任何数据文件的情况下以仅元数据操作删除列,必须为表启用列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
重要
从元数据中删除列不会删除文件中该列的基础数据。 若要清除已删除的列数据,可以使用 REORG TABLE 重写文件。 然后,可以使用 VACUUM 以物理方式删除包含已删除的列数据的文件。
删除列:
ALTER TABLE table_name DROP COLUMN col_name
删除多列:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
显式更新架构以更改列类型或名称
可以通过重写表来更改列的类型或名称或删除列。 为此,请使用 overwriteSchema
选项。
以下示例演示如何更改列类型:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
以下示例演示如何更改列名称:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
启用架构演变
可以通过执行以下操作之一来启用架构演变:
- 将
.option("mergeSchema", "true")
设置为 Spark DataFramewrite
或writeStream
操作。 请参阅为写入操作启用架构演变以添加新列。 - 使用
MERGE WITH SCHEMA EVOLUTION
语法。 请参阅“合并的架构演变语法”。 - 将当前 SparkSession 的 Spark conf
spark.databricks.delta.schema.autoMerge.enabled
设置为true
。
Databricks 建议为每个写入操作启用架构演变,而不是设置 Spark conf。
使用选项或语法在写入操作中启用架构演变时,这优先于 Spark conf。
注意
INSERT INTO
语句没有架构演变子句。
为写入操作启用架构演变以添加新列
启用架构演变时,源查询中存在但目标表中缺少的列会自动添加为写入事务的一部分。 请参阅启用架构演变。
追加新列时,会保留大小写。 新列将添加到表架构的末尾。 如果其他列位于结构中,它们将被追加到目标表中结构的末尾。
以下示例演示如何将 mergeSchema
选项与自动加载程序配合使用。 请参阅什么是自动加载程序?。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
以下示例演示如何将 mergeSchema
选项与批处理写入操作配合使用:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Delta Lake 合并的自动架构演变
用户可以使用架构演变来解决合并的目标表与源表之间的架构不匹配问题。 此功能可处理以下两种情况:
- 源表中的列在目标表中不存在。 新列已添加到目标架构,其值是使用源值插入或更新的。
- 目标表中的列在源表中不存在。 目标架构保持不变;附加目标列中的值保持不变(对于
UPDATE
)或设置为NULL
(对于INSERT
)。
必须手动启用自动架构演变。 请参阅启用架构演变。
注意
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在插入或更新操作中按名称指定源表中的列和结构字段。 在 Databricks Runtime 11.3 LTS 及以下版本中,只有 INSERT *
或 UPDATE SET *
操作可用于通过合并进行架构演变。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以将架构演变与嵌套在映射中的结构一起使用,例如 map<int, struct<a: int, b: int>>
。
用于合并的架构演变语法
在 Databricks Runtime 15.2 及更高版本中,可以使用 SQL 或 Delta 表 API 在合并语句中指定架构演变:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
使用架构演变的合并操作示例
以下示例展示了在有架构演变和没有架构演变的情况下 merge
操作的效果。
列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 有架构演变的行为 |
---|---|---|---|
目标列:key, value 源列: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
表架构保持不变;仅更新/插入列 key 、value 。 |
表架构更改为 (key, value, new_value) 。 使用源中的 value 和 new_value 更新具有匹配项的现有记录。 使用架构 (key, value, new_value) 插入新行。 |
目标列:key, old_value 源列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE 和 INSERT 操作会引发错误,因为目标列 old_value 不在源中。 |
表架构更改为 (key, old_value, new_value) 。 使用源中的 new_value 更新具有匹配项的现有记录,而 old_value 保持不变。 使用 key 的指定 new_value 、NULL 和 old_value 插入新记录。 |
目标列:key, old_value 源列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE 引发错误,因为目标表中不存在列 new_value 。 |
表架构更改为 (key, old_value, new_value) 。 使用源中的 new_value 更新具有匹配项的现有记录,old_value 保持不变,不匹配的记录包含针对 NULL 输入的 new_value 。 参阅注释 (1)。 |
目标列:key, old_value 源列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT 引发错误,因为目标表中不存在列 new_value 。 |
表架构更改为 (key, old_value, new_value) 。 使用 key 的指定 new_value 、NULL 和 old_value 插入新记录。 现有记录包含针对 NULL 输入的 new_value ,而 old_value 保持不变。 参阅注释 (1)。 |
(1) 此行为在 Databricks Runtime 12.2 及更高版本中可用;Databricks Runtime 11.3 LTS 及以下版本在这种情况下会出错。
使用 Delta Lake 合并排除列
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在合并条件中使用 EXCEPT
子句显式排除列。 EXCEPT
关键字的行为因是否启用架构演变而异。
禁用架构演变后,EXCEPT
关键字将应用于目标表中的列列表,并允许从 UPDATE
或 INSERT
操作中排除列。 排除的列设置为 null
。
启用架构演变后,EXCEPT
关键字将应用于源表中的列列表,并允许从架构演变中排除列。 如果 EXCEPT
子句中列出了源中不存在的新列,则不会将其添加到目标架构中。 目标中已存在的排除列设置为 null
。
以下示例演示了这些语法:
列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 有架构演变的行为 |
---|---|---|---|
目标列:id, title, last_updated 源列: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
通过将 last_updated 字段设置为当前日期来更新匹配的行。 使用 id 和 title 的值插入新行。 排除的字段 last_updated 设置为 null 。 review 字段被忽略,因为它不在目标中。 |
通过将 last_updated 字段设置为当前日期来更新匹配的行。 架构已演变为添加 review 字段。 使用除 last_updated 设置为 null 外的所有源字段插入新行。 |
目标列:id, title, last_updated 源列: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT 引发错误,因为目标表中不存在列 internal_count 。 |
通过将 last_updated 字段设置为当前日期来更新匹配的行。 review 字段将添加到目标表,但忽略该 internal_count 字段。 插入的新行已将 last_updated 设置为 null 。 |
处理架构更新中的 NullType
列
由于 Parquet 不支持 NullType
,因此在写入到 Delta 表时会从数据帧中删除 NullType
列,但仍会将其存储在架构中。 如果为该列接收到不同的数据类型,则 Delta Lake 会将该架构合并到新的数据类型。 如果 Delta Lake 接收到现有列的 NullType
,则在写入过程中会保留旧架构并删除新列。
不支持流式处理中的 NullType
。 由于在使用流式处理时必须设置架构,因此这应该非常罕见。 对于复杂类型(例如 NullType
和 ArrayType
),也不会接受 MapType
。
替换表架构
默认情况下,覆盖表中的数据不会覆盖架构。 在不使用 mode("overwrite")
的情况下使用 replaceWhere
来覆盖表时,你可能还希望覆盖写入的数据的架构。 你可以通过将 overwriteSchema
选项设置为 true
来替换表的架构和分区:
df.write.option("overwriteSchema", "true")
重要
使用动态分区覆盖时,不能将 overwriteSchema
指定为 true
。