แชร์ผ่าน


Selectively overwrite data with Delta Lake

Azure Databricks leverages Delta Lake functionality to support two distinct options for selective overwrites:

  • The replaceWhere option atomically replaces all records that match a given predicate.
  • You can replace directories of data based on how tables are partitioned using dynamic partition overwrites.

For most operations, Databricks recommends using replaceWhere to specify which data to overwrite.

Important

If data has been accidentally overwritten, you can use restore to undo the change.

Arbitrary selective overwrite with replaceWhere

You can selectively overwrite only the data that matches an arbitrary expression.

Note

SQL requires Databricks Runtime 12.2 LTS or above.

The following command atomically replaces events in January in the target table, which is partitioned by start_date, with the data in replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

This sample code writes out the data in replace_data, validates that all rows match the predicate, and performs an atomic replacement using overwrite semantics. If any values in the operation fall outside the constraint, this operation fails with an error by default.

You can change this behavior to overwrite values within the predicate range and insert records that fall outside the specified range. To do so, disable the constraint check by setting spark.databricks.delta.replaceWhere.constraintCheck.enabled to false using one of the following settings:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Legacy behavior

Legacy default behavior had replaceWhere overwrite data matching a predicate over partition columns only. With this legacy model, the following command would atomically replace the month January in the target table, which is partitioned by date, with the data in df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

If you want to fall back to the old behavior, you can disable the spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamic partition overwrites

Important

This feature is in Public Preview.

Databricks Runtime 11.3 LTS and above supports dynamic partition overwrite mode for partitioned tables. For tables with multiple partitions, Databricks Runtime 11.3 LTS and below only support dynamic partition overwrites if all partition columns are of the same data type.

When in dynamic partition overwrite mode, operations overwrite all existing data in each logical partition for which the write commits new data. Any existing logical partitions for which the write does not contain data remain unchanged. This mode is only applicable when data is being written in overwrite mode: either INSERT OVERWRITE in SQL, or a DataFrame write with df.write.mode("overwrite").

Configure dynamic partition overwrite mode by setting the Spark session configuration spark.sql.sources.partitionOverwriteMode to dynamic. You can also enable this by setting the DataFrameWriter option partitionOverwriteMode to dynamic. If present, the query-specific option overrides the mode defined in the session configuration. The default for partitionOverwriteMode is static.

Important

Validate that the data written with dynamic partition overwrite touches only the expected partitions. A single row in the incorrect partition can lead to unintentionally overwriting an entire partition.

The following example demonstrates using dynamic partition overwrites:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Note

  • Dynamic partition overwrite conflicts with the option replaceWhere for partitioned tables.
    • If dynamic partition overwrite is enabled in the Spark session configuration, and replaceWhere is provided as a DataFrameWriter option, then Delta Lake overwrites the data according to the replaceWhere expression (query-specific options override session configurations).
    • You receive an error if the DataFrameWriter options have both dynamic partition overwrite and replaceWhere enabled.
  • You cannot specify overwriteSchema as true when using dynamic partition overwrite.