Обновление схемы таблицы Delta Lake
Delta Lake позволяет обновлять схему таблицы. Поддерживаются следующие типы изменений.
- Добавление новых столбцов (в произвольных расположениях)
- Изменение порядка существующих столбцов
- Переименование существующих столбцов
Вы можете внести эти изменения явно с помощью DDL или неявно с помощью DML.
Внимание
Обновление схемы таблицы Delta — это операция, которая конфликтует со всеми параллельными операциями записи Delta.
При обновлении схемы таблицы Delta потоки чтения данных из этой таблицы будут завершены. Чтобы сохранить поток, необходимо перезапустить его. Рекомендуемые методы см . в разделе "Рекомендации по рабочей среде" для структурированной потоковой передачи.
Явное обновление схемы для добавления столбцов
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
По умолчанию допустимость значений NULL установлена как true
.
Чтобы добавить столбец во вложенное поле, используйте:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Например, если схема перед выполнением ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
:
- root
| - colA
| - colB
| +-field1
| +-field2
после этого схема выглядит так:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Примечание.
Добавление вложенных столбцов поддерживается только для структур. Массивы и карты не поддерживаются.
Явное обновление схемы изменения комментариев столбцов или упорядочения
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Чтобы изменить столбец во вложенном поле, используйте:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Например, если схема перед выполнением ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
:
- root
| - colA
| - colB
| +-field1
| +-field2
после этого схема выглядит так:
- root
| - colA
| - colB
| +-field2
| +-field1
Явное обновление схемы для замены столбцов
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Например, при выполнении следующего DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
если до этого схема имеет следующий вид:
- root
| - colA
| - colB
| +-field1
| +-field2
после этого схема выглядит так:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Явное обновление схемы для переименования столбцов
Примечание.
Эта функция доступна в Databricks Runtime 10.4 LTS и выше.
Чтобы переименовать столбцы без перезаписи существующих данных в них, необходимо включить сопоставление столбцов для таблицы. См. раздел "Переименование и удаление столбцов" с сопоставлением столбцов Delta Lake.
Переименование столбцов:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Чтобы переименовать вложенное поле, сделайте следующее:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Например, при выполнении следующей команды:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Если до этого схема имеет следующий вид:
- root
| - colA
| - colB
| +-field1
| +-field2
После этого схема выглядит так:
- root
| - colA
| - colB
| +-field001
| +-field2
См. раздел "Переименование и удаление столбцов" с сопоставлением столбцов Delta Lake.
Явное обновление схемы для удаления столбцов
Примечание.
Эта функция доступна в Databricks Runtime 11.3 LTS и выше.
Чтобы удалить столбцы, оперируя только с метаданными, то есть без перезаписи файлов данных, для этой таблицы должно быть включено сопоставление столбцов. См. раздел "Переименование и удаление столбцов" с сопоставлением столбцов Delta Lake.
Внимание
Удаление столбца из метаданных не приводит к фактическому удалению данных этих столбцов из файлов данных. Чтобы очистить данные удаленного столбца, сначала выполните REORG TABLE для перезаписи файлов. Затем вы можете выполнить VACUUM для физического удаления файлов, содержащих данные удаленного столбца.
Чтобы удалить столбец, выполните приведенные действия.
ALTER TABLE table_name DROP COLUMN col_name
Чтобы удалить несколько столбцов, выполните следующее.
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Явное обновление схемы для изменения типа столбца или имени
Можно изменить тип или имя столбца или удалить столбец, перезаписав таблицу. Для этого используйте overwriteSchema
этот параметр.
В следующем примере показано изменение типа столбца:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
В следующем примере показано изменение имени столбца:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Включение эволюции схемы
Вы можете включить эволюцию схемы, выполнив одно из следующих действий:
- Задайте для
.option("mergeSchema", "true")
кадра данныхwrite
Spark илиwriteStream
операции. Чтобы добавить новые столбцы, см. раздел "Включить эволюцию схемы" для записи. - Используйте
MERGE WITH SCHEMA EVOLUTION
синтаксис. См . синтаксис эволюции схемы для слияния. - Задайте для параметра Spark conf
spark.databricks.delta.schema.autoMerge.enabled
true
значение current SparkSession.
Databricks рекомендует включить эволюцию схемы для каждой операции записи, а не задать conf Spark.
При использовании параметров или синтаксиса для включения эволюции схемы в операции записи это имеет приоритет над conf Spark.
Примечание.
Для инструкций нет предложения INSERT INTO
по эволюции схемы.
Включение эволюции схемы для записи для добавления новых столбцов
Столбцы, которые присутствуют в исходном запросе, но отсутствующие из целевой таблицы, автоматически добавляются в рамках транзакции записи при включении эволюции схемы. См. раздел "Включить эволюцию схемы".
Регистр сохраняется при добавлении нового столбца. Новые столбцы добавляются в конец схемы таблицы. Если дополнительные столбцы находятся в структуре, они добавляются в конец структуры в целевой таблице.
В следующем примере показано использование mergeSchema
параметра с автозагрузчиком. См. статью об автозагрузчике.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
В следующем примере показано использование mergeSchema
параметра с пакетной операцией записи:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Автоматическая эволюция схемы для слияния Delta Lake
Эволюция схемы позволяет пользователям разрешать несоответствия схемы между целевой и исходной таблицей в слиянии. Он обрабатывает следующие два случая:
- Столбец в исходной таблице отсутствует в целевой таблице. Новый столбец добавляется в целевую схему, а его значения вставляются или обновляются с помощью исходных значений.
- Столбец в целевой таблице отсутствует в исходной таблице. Целевая схема остается без изменений; Значения в дополнительном целевом столбце остаются неизменными (для
UPDATE
) или имеют значениеNULL
(forINSERT
).
Необходимо вручную включить автоматическую эволюцию схемы. См. раздел "Включить эволюцию схемы".
Примечание.
В Databricks Runtime 12.2 LTS и более поздних версиях столбцы и поля структуры, присутствующих в исходной таблице, можно указать по имени в действиях вставки или обновления. В Databricks Runtime 11.3 LTS и ниже можно использовать только INSERT *
действия или UPDATE SET *
действия для эволюции схемы с слиянием.
В Databricks Runtime 13.3 LTS и более поздних версиях можно использовать эволюцию схемы с структурой, вложенными в карты, например map<int, struct<a: int, b: int>>
.
Синтаксис эволюции схемы для слияния
В Databricks Runtime 15.2 и более поздних версиях можно указать эволюцию схемы в инструкции слияния с помощью API таблиц SQL или Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Примеры операций слияния с эволюцией схемы
Ниже приведено несколько примеров влияния операции merge
с развитием схемы и без него.
Столбцы | Запрос (в SQL) | Поведение без развития схемы (по умолчанию) | Поведение при развитии схемы |
---|---|---|---|
Целевые столбцы: key, value Исходные столбцы: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Схема таблицы остается неизменной; обновляются или вставляются только столбцы key и value . |
Схема таблицы меняется на (key, value, new_value) . Существующие записи со совпадениями обновляются с помощью value источника и new_value в источнике. Новые строки вставляются со схемой (key, value, new_value) . |
Целевые столбцы: key, old_value Исходные столбцы: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Действия UPDATE и INSERT вызывают ошибку, так как целевой столбец old_value не находится в источнике. |
Схема таблицы меняется на (key, old_value, new_value) . Существующие записи со совпадениями обновляются в new_value источнике, оставляя old_value без изменений. Новые записи вставляются с указанным key , new_value а NULL также для old_value . |
Целевые столбцы: key, old_value Исходные столбцы: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE выдает ошибку, поскольку столбец new_value не существует в целевой таблице. |
Схема таблицы меняется на (key, old_value, new_value) . Существующие записи со совпадениями обновляются в new_value источнике, оставляя old_value без изменений, и несоответствующие записи NULL введены для new_value . См. примечание (1). |
Целевые столбцы: key, old_value Исходные столбцы: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT выдает ошибку, поскольку столбец new_value не существует в целевой таблице. |
Схема таблицы меняется на (key, old_value, new_value) . Новые записи вставляются с указанным key , new_value а NULL также для old_value . Существующие записи введены NULL для new_value выхода old_value из него без изменений. См. примечание (1). |
(1) Это поведение доступно в Databricks Runtime 12.2 LTS и выше; Databricks Runtime 11.3 LTS и ниже ошибки в этом условии.
Исключение столбцов с слиянием Delta Lake
В Databricks Runtime 12.2 LTS и более поздних версиях можно использовать EXCEPT
предложения в условиях слияния для явного исключения столбцов. Поведение ключевого EXCEPT
слова зависит от того, включена ли эволюция схемы.
При отключенной EXCEPT
эволюции схемы ключевое слово применяется к списку столбцов в целевой таблице и позволяет исключить столбцы из UPDATE
или INSERT
действий. Для исключенных столбцов задано значение null
.
При включенной EXCEPT
эволюции схемы ключевое слово применяется к списку столбцов в исходной таблице и позволяет исключить столбцы из эволюции схемы. Новый столбец в источнике, который отсутствует в целевом объекте, не добавляется в целевую схему, если он указан в предложении EXCEPT
. Исключенные столбцы, которые уже присутствуют в целевом объекте, имеют null
значение .
В следующих примерах показан этот синтаксис:
Столбцы | Запрос (в SQL) | Поведение без развития схемы (по умолчанию) | Поведение при развитии схемы |
---|---|---|---|
Целевые столбцы: id, title, last_updated Исходные столбцы: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Сопоставленные строки обновляются путем задания last_updated поля текущей дате. Новые строки вставляются с помощью значений и id title . Для исключенного поля задано значение last_updated null . Поле review игнорируется, так как оно не находится в целевом объекте. |
Сопоставленные строки обновляются путем задания last_updated поля текущей дате. Схема развивается для добавления поля review . Новые строки вставляются с помощью всех исходных полей, за исключением last_updated которых задано null значение . |
Целевые столбцы: id, title, last_updated Исходные столбцы: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT выдает ошибку, поскольку столбец internal_count не существует в целевой таблице. |
Сопоставленные строки обновляются путем задания last_updated поля текущей дате. Поле review добавляется в целевую таблицу, но internal_count поле игнорируется. Для новых вставленных строк задано last_updated значение null . |
Работа со NullType
столбцами в обновлениях схемы
Поскольку Parquet не поддерживает NullType
, столбцы NullType
удаляются из кадров данных при записи в таблицы Delta, но по-прежнему хранятся в схеме. Если для этого столбца получен другой тип данных, Delta Lake объединит схему с новым типом данных. Если Delta Lake получает NullType
для существующего столбца, старая схема сохраняется, а новый столбец будет удален во время записи.
NullType
в потоковой передаче не поддерживается. Так как при использовании потоковой передачи необходимо задать схемы, эта операция применяется очень редко. NullType
также не принимается для сложных типов, таких как ArrayType
и MapType
.
Замена схемы таблицы
По умолчанию перезапись данных в таблице не приводит к перезаписи схемы. При перезаписи таблицы, использующей mode("overwrite")
без replaceWhere
, вы по-прежнему можете перезаписать схему записываемых данных. Замените схему и секционирование таблицы, задав для параметра overwriteSchema
значение true
:
df.write.option("overwriteSchema", "true")
Внимание
Невозможно указать overwriteSchema
, как true
при использовании динамической перезаписи секции.