Выборочно перезаписывать данные с помощью Delta Lake
Azure Databricks использует функции Delta Lake для поддержки двух различных вариантов выборочного перезаписи:
- Параметр
replaceWhere
атомарно заменяет все записи, соответствующие заданному предикату. - Можно заменить каталоги данных на основе секционирования таблиц с помощью динамических перезаписей секций.
Для большинства операций Databricks рекомендует указать replaceWhere
, какие данные следует перезаписать.
Внимание
Если данные были случайно перезаписаны, можно использовать восстановление для отмены изменения.
Произвольная выборочная перезапись с помощью replaceWhere
Вы можете выборочно перезаписать только данные, соответствующие произвольному выражению.
Примечание.
ДЛЯ SQL требуется Databricks Runtime 12.2 LTS или более поздней версии.
Следующая команда выполняет атомарную замену событий в январе в целевой таблице, которая секционирована по start_date
с использованием данных в replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Этот пример кода записывает данные в replace_data
, проверяет соответствие всех строк предикату и выполняет атомарную замену с помощью семантики overwrite
. Если какие-либо значения в операции выходят за пределы ограничения, эта операция завершается ошибкой по умолчанию.
Это поведение можно изменить на overwrite
значения в диапазоне предиката и insert
записи, которые выходят за пределы указанного диапазона. Для этого отключите проверку ограничения, установив spark.databricks.delta.replaceWhere.constraintCheck.enabled
значение false, используя один из следующих параметров:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Устаревшее поведение
Устаревшее поведение по умолчанию перезаписывало replaceWhere
данные, соответствующие предикату только для столбцов секций. В этой устаревшей модели следующая команда будет атомарно заменять месяц в целевой таблице, которая секционирована date
по данным в df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Если вы хотите вернуться к старому поведению, можно отключить spark.databricks.delta.replaceWhere.dataColumns.enabled
флаг:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Динамические перезаписи секций
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Databricks Runtime 11.3 LTS и более поздних версий поддерживает режим динамической перезаписи секционирования для секционированных таблиц. Для таблиц с несколькими секциями Databricks Runtime 11.3 LTS и ниже поддерживаются только динамические перезаписи секций, если все столбцы секций имеют одинаковый тип данных.
При перезаписи динамического раздела операции перезаписывают все существующие данные в каждой логической секции, для которой запись фиксирует новые данные. Все существующие логические секции, для которых запись не содержит данных, остаются неизменными. Этот режим применяется только в том случае, если данные записываются в режиме перезаписи: INSERT OVERWRITE
в SQL или запись DataFrame с df.write.mode("overwrite")
.
Чтобы настроить режим динамической перезаписи секций, задайте для конфигурации сеанса Spark spark.sql.sources.partitionOverwriteMode
значение dynamic
. Его также можно включить, задав для параметра partitionOverwriteMode
в DataFrameWriter
значение dynamic
. Если задан параметр на уровне запроса, он переопределяет режим, указанный в конфигурации сеанса. Значение partitionOverwriteMode
по умолчанию — static
.
Внимание
Убедитесь в том, что данные, записываемые путем динамической перезаписи секций, относятся к соответствующим секциям. Наличие всего одной строки в неправильной секции может привести к непреднамеренной перезаписи всей секции.
В следующем примере показано использование динамических перезаписей секций:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Примечание.
- Динамическая перезапись секций конфликтует с параметром
replaceWhere
секционированных таблиц.- Если в конфигурации сеанса Spark включена динамическая перезапись секций и в
DataFrameWriter
задан параметрreplaceWhere
, Delta Lake перезаписывает данные в соответствии с выражениемreplaceWhere
(параметры на уровне запроса переопределяют конфигурацию сеанса). - При наличии динамической перезаписи и
replaceWhere
включения параметров возникает ошибкаDataFrameWriter
.
- Если в конфигурации сеанса Spark включена динамическая перезапись секций и в
- Невозможно указать
overwriteSchema
, какtrue
при использовании динамической перезаписи секции.