Выборочно перезаписывать данные с помощью Delta Lake
Azure Databricks использует функции Delta Lake для поддержки двух различных вариантов выборочного перезаписи:
- Параметр
replaceWhere
атомарно заменяет все записи, соответствующие заданному предикату. - Можно заменить каталоги данных на основе разбиения tables, используя динамическое перезаписывание partition.
Для большинства операций Databricks рекомендует указать replaceWhere
, какие данные следует перезаписать.
Внимание
Если данные были случайно перезаписаны, можно использовать restore для отмены изменения.
Произвольная выборочная перезапись с помощью replaceWhere
Вы можете выборочно перезаписать только данные, соответствующие произвольному выражению.
Примечание.
ДЛЯ SQL требуется Databricks Runtime 12.2 LTS или более поздней версии.
Следующая команда атомарно заменяет события в январе в целевом объекте table, который секционируется по start_date
, данными из replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Этот пример кода записывает данные в replace_data
, проверяет соответствие всех строк предикату и выполняет атомарную замену с помощью семантики overwrite
. Если любая values в операции выходит за пределы constraint, эта операция завершается ошибкой по умолчанию.
Это поведение можно изменить на overwrite
values в диапазоне предиката и insert
записей, которые выходят за пределы указанного диапазона. Для этого отключите проверку constraint, установив для параметра spark.databricks.delta.replaceWhere.constraintCheck.enabled
значение false, используя один из следующих параметров:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Устаревшее поведение
Устаревшее поведение по умолчанию подразумевало, что replaceWhere
перезаписывал данные, соответствующие предикату, только на partitionиcolumns. В этой устаревшей модели следующая команда атомарно заменит месяц январь в целевом table, который разбивается на секции date
, на данные из df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Если вы хотите вернуться к старому поведению, можно отключить spark.databricks.delta.replaceWhere.dataColumns.enabled
флаг:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
динамические перезаписи partition
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Databricks Runtime 11.3 LTS и более поздние версии поддерживают динамическийpartition режим перезаписи для секционированных tables. Для tables с несколькими разделами Databricks Runtime 11.3 LTS и ниже поддерживают только динамические перезаписи partition, если все partitioncolumns имеют одинаковый тип данных.
При использовании динамического режима перезаписи partition все существующие данные в каждом логическом partition, в который были записаны новые данные, перезаписываются. Все существующие логические секции, для которых запись не содержит данных, остаются неизменными. Этот режим применяется только в том случае, если данные записываются в режиме перезаписи: INSERT OVERWRITE
в SQL или запись DataFrame с df.write.mode("overwrite")
.
Настройте режим перезаписи partition динамически, задав конфигурацию сеанса Spark spark.sql.sources.partitionOverwriteMode
на dynamic
. Его также можно включить, задав для параметра DataFrameWriter
в partitionOverwriteMode
значение dynamic
. Если задан параметр на уровне запроса, он переопределяет режим, указанный в конфигурации сеанса. Значение partitionOverwriteMode
по умолчанию — static
.
Внимание
Убедитесь, что данные, записанные с помощью динамической partition, перезаписывают только ожидаемые секции. Одна строка в неправильной partition может привести к непреднамеренному перезаписи всего partition.
В следующем примере показано использование динамических замещений partition.
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Примечание.
- Динамические partition перезаписывают конфликты с параметром
replaceWhere
секционированных tables.- Если в конфигурации сеанса Spark включена динамическая перезапись partition и
replaceWhere
предоставляется как параметрDataFrameWriter
, тогда Delta Lake перезаписывает данные в соответствии с выражениемreplaceWhere
(параметры, относящиеся к запросу, переопределяют конфигурации сеанса). - Вы получите ошибку, если у параметров
DataFrameWriter
активированы как динамическое partition перезаписи, так иreplaceWhere
.
- Если в конфигурации сеанса Spark включена динамическая перезапись partition и
- Невозможно указать
overwriteSchema
какtrue
при использовании динамической partition перезаписи.