Sovrascrivere in modo selettivo i dati con Delta Lake
Azure Databricks sfrutta la funzionalità Delta Lake per supportare due opzioni distinte per sovrascrizioni selettive:
- L'opzione
replaceWhere
sostituisce atomicamente tutti i record che corrispondono a un predicato specificato. - È possibile sostituire le directory dei dati in base a come sono partizionate le tables utilizzando sovrascrizioni dinamiche di partition.
Per la maggior parte delle operazioni, Databricks consiglia di usare replaceWhere
per specificare i dati da sovrascrivere.
Importante
Se i dati sono stati sovrascritti accidentalmente, è possibile usare restore per annullare la modifica.
Sovrascrittura selettiva arbitraria con replaceWhere
È possibile sovrascrivere in modo selettivo solo i dati corrispondenti a un'espressione arbitraria.
Nota
SQL richiede Databricks Runtime 12.2 LTS o versione successiva.
Il comando seguente sostituisce in modo atomico gli eventi di gennaio nell'tabledi destinazione , partizionati da start_date
, con i dati in replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Questo codice di esempio scrive i dati in replace_data
, convalida che tutte le righe corrispondano al predicato ed esegue una sostituzione atomica usando overwrite
la semantica. Se uno o più values nell'ambito dell'operazione sono al di fuori del constraint, l'operazione fallisce con un errore per impostazione predefinita.
È possibile modificare questo comportamento impostando overwrite
values all'interno dell'intervallo di predicati e insert
record che non rientrano nell'intervallo specificato. A tale scopo, disabilitare il controllo constraint impostando spark.databricks.delta.replaceWhere.constraintCheck.enabled
su false usando una delle impostazioni seguenti:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Comportamento legacy
Il comportamento predefinito legacy era che replaceWhere
sovrascriveva i dati corrispondenti a un predicato solo su partitioncolumns. Con questo modello legacy, il comando seguente sostituisce automaticamente il mese di gennaio nel tabledi destinazione, suddiviso tramite date
, utilizzando i dati in df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Se si vuole eseguire il fallback al comportamento precedente, è possibile disabilitare il spark.databricks.delta.replaceWhere.dataColumns.enabled
flag:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
sovrascritture dinamiche partition
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Databricks Runtime 11.3 LTS e versioni successive supporta modalità di sovrascrittura dinamicapartition per i tablespartizionati. Per tables con più partizioni, Databricks Runtime 11.3 LTS e versioni precedenti supportano le sovrascritture dinamiche di partition solo se tutte le partitioncolumns sono dello stesso tipo di dati.
Quando si è in modalità di sovrascrittura dinamica partition, le operazioni sovrascrivono tutti i dati esistenti in ogni partition logica per cui la scrittura commette nuovi dati. Tutte le partizioni logiche esistenti per le quali la scrittura non contiene dati rimangono invariati. Questa modalità è applicabile solo quando i dati vengono scritti in modalità di sovrascrittura: INSERT OVERWRITE
in SQL o in una scrittura di dataframe con df.write.mode("overwrite")
.
Configurare la modalità di sovrascrittura partition dinamica impostando la configurazione della sessione Spark spark.sql.sources.partitionOverwriteMode
su dynamic
. È anche possibile abilitare questa opzione impostando l'opzione DataFrameWriter
partitionOverwriteMode
su dynamic
. Se presente, l'opzione specifica della query sostituisce la modalità definita nella configurazione della sessione. L'impostazione predefinita per partitionOverwriteMode
è static
.
Importante
Verificare che i dati scritti con partition dinamici sovrascrivono solo le partizioni previste. Una singola riga errata nell'partition può portare alla sovrascrittura involontaria di un intero partition.
L'esempio seguente dimostra l'uso di sovrascritture dinamiche partition:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Nota
- La sovrascrittura dinamica di partition è in conflitto con l'opzione
replaceWhere
per i tablespartizionati.- Se la sovrascrittura partition dinamica è abilitata nella configurazione della sessione Spark e
replaceWhere
viene fornita come opzioneDataFrameWriter
, Delta Lake sovrascrive i dati in base all'espressionereplaceWhere
(le opzioni specifiche delle query sostituiscono le configurazioni della sessione). - Viene visualizzato un errore se le opzioni di
DataFrameWriter
hanno entrambe la sovrascrittura dinamica di partition ereplaceWhere
abilitate.
- Se la sovrascrittura partition dinamica è abilitata nella configurazione della sessione Spark e
- Non è possibile specificare
overwriteSchema
cometrue
quando si usa la sovrascrittura dinamica di partition.