Поделиться через


Выборочно перезаписывать данные с помощью Delta Lake

Azure Databricks использует функции Delta Lake для поддержки двух различных вариантов выборочного перезаписи:

  • Параметр replaceWhere атомарно заменяет все записи, соответствующие заданному предикату.
  • Можно заменить каталоги данных на основе разбиения tables, используя динамическое перезаписывание partition.

Для большинства операций Databricks рекомендует указать replaceWhere , какие данные следует перезаписать.

Внимание

Если данные были случайно перезаписаны, можно использовать restore для отмены изменения.

Произвольная выборочная перезапись с помощью replaceWhere

Вы можете выборочно перезаписать только данные, соответствующие произвольному выражению.

Примечание.

ДЛЯ SQL требуется Databricks Runtime 12.2 LTS или более поздней версии.

Следующая команда атомарно заменяет события в январе в целевом объекте table, который секционируется по start_date, данными из replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Этот пример кода записывает данные в replace_data, проверяет соответствие всех строк предикату и выполняет атомарную замену с помощью семантики overwrite . Если любая values в операции выходит за пределы constraint, эта операция завершается ошибкой по умолчанию.

Это поведение можно изменить на overwritevalues в диапазоне предиката и insert записей, которые выходят за пределы указанного диапазона. Для этого отключите проверку constraint, установив для параметра spark.databricks.delta.replaceWhere.constraintCheck.enabled значение false, используя один из следующих параметров:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Устаревшее поведение

Устаревшее поведение по умолчанию подразумевало, что replaceWhere перезаписывал данные, соответствующие предикату, только на partitionиcolumns. В этой устаревшей модели следующая команда атомарно заменит месяц январь в целевом table, который разбивается на секции date, на данные из df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Если вы хотите вернуться к старому поведению, можно отключить spark.databricks.delta.replaceWhere.dataColumns.enabled флаг:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

динамические перезаписи partition

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Databricks Runtime 11.3 LTS и более поздние версии поддерживают динамическийpartition режим перезаписи для секционированных tables. Для tables с несколькими разделами Databricks Runtime 11.3 LTS и ниже поддерживают только динамические перезаписи partition, если все partitioncolumns имеют одинаковый тип данных.

При использовании динамического режима перезаписи partition все существующие данные в каждом логическом partition, в который были записаны новые данные, перезаписываются. Все существующие логические секции, для которых запись не содержит данных, остаются неизменными. Этот режим применяется только в том случае, если данные записываются в режиме перезаписи: INSERT OVERWRITE в SQL или запись DataFrame с df.write.mode("overwrite").

Настройте режим перезаписи partition динамически, задав конфигурацию сеанса Spark spark.sql.sources.partitionOverwriteMode на dynamic. Его также можно включить, задав для параметра DataFrameWriter в partitionOverwriteMode значение dynamic. Если задан параметр на уровне запроса, он переопределяет режим, указанный в конфигурации сеанса. Значение partitionOverwriteMode по умолчанию — static.

Внимание

Убедитесь, что данные, записанные с помощью динамической partition, перезаписывают только ожидаемые секции. Одна строка в неправильной partition может привести к непреднамеренному перезаписи всего partition.

В следующем примере показано использование динамических замещений partition.

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Примечание.

  • Динамические partition перезаписывают конфликты с параметром replaceWhere секционированных tables.
    • Если в конфигурации сеанса Spark включена динамическая перезапись partition и replaceWhere предоставляется как параметр DataFrameWriter, тогда Delta Lake перезаписывает данные в соответствии с выражением replaceWhere (параметры, относящиеся к запросу, переопределяют конфигурации сеанса).
    • Вы получите ошибку, если у параметров DataFrameWriter активированы как динамическое partition перезаписи, так и replaceWhere.
  • Невозможно указать overwriteSchema как true при использовании динамической partition перезаписи.