Condividi tramite


Sovrascrivere in modo selettivo i dati con Delta Lake

Azure Databricks sfrutta la funzionalità Delta Lake per supportare due opzioni distinte per sovrascrizioni selettive:

  • L'opzione replaceWhere sostituisce atomicamente tutti i record che corrispondono a un predicato specificato.
  • È possibile sostituire le directory dei dati in base a come sono partizionate le tables utilizzando sovrascrizioni dinamiche di partition.

Per la maggior parte delle operazioni, Databricks consiglia di usare replaceWhere per specificare i dati da sovrascrivere.

Importante

Se i dati sono stati sovrascritti accidentalmente, è possibile usare restore per annullare la modifica.

Sovrascrittura selettiva arbitraria con replaceWhere

È possibile sovrascrivere in modo selettivo solo i dati corrispondenti a un'espressione arbitraria.

Nota

SQL richiede Databricks Runtime 12.2 LTS o versione successiva.

Il comando seguente sostituisce in modo atomico gli eventi di gennaio nell'tabledi destinazione , partizionati da start_date, con i dati in replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Questo codice di esempio scrive i dati in replace_data, convalida che tutte le righe corrispondano al predicato ed esegue una sostituzione atomica usando overwrite la semantica. Se uno o più values nell'ambito dell'operazione sono al di fuori del constraint, l'operazione fallisce con un errore per impostazione predefinita.

È possibile modificare questo comportamento impostando overwritevalues all'interno dell'intervallo di predicati e insert record che non rientrano nell'intervallo specificato. A tale scopo, disabilitare il controllo constraint impostando spark.databricks.delta.replaceWhere.constraintCheck.enabled su false usando una delle impostazioni seguenti:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamento legacy

Il comportamento predefinito legacy era che replaceWhere sovrascriveva i dati corrispondenti a un predicato solo su partitioncolumns. Con questo modello legacy, il comando seguente sostituisce automaticamente il mese di gennaio nel tabledi destinazione, suddiviso tramite date, utilizzando i dati in df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Se si vuole eseguire il fallback al comportamento precedente, è possibile disabilitare il spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

sovrascritture dinamiche partition

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Databricks Runtime 11.3 LTS e versioni successive supporta modalità di sovrascrittura dinamicapartition per i tablespartizionati. Per tables con più partizioni, Databricks Runtime 11.3 LTS e versioni precedenti supportano le sovrascritture dinamiche di partition solo se tutte le partitioncolumns sono dello stesso tipo di dati.

Quando si è in modalità di sovrascrittura dinamica partition, le operazioni sovrascrivono tutti i dati esistenti in ogni partition logica per cui la scrittura commette nuovi dati. Tutte le partizioni logiche esistenti per le quali la scrittura non contiene dati rimangono invariati. Questa modalità è applicabile solo quando i dati vengono scritti in modalità di sovrascrittura: INSERT OVERWRITE in SQL o in una scrittura di dataframe con df.write.mode("overwrite").

Configurare la modalità di sovrascrittura partition dinamica impostando la configurazione della sessione Spark spark.sql.sources.partitionOverwriteMode su dynamic. È anche possibile abilitare questa opzione impostando l'opzione DataFrameWriterpartitionOverwriteMode su dynamic. Se presente, l'opzione specifica della query sostituisce la modalità definita nella configurazione della sessione. L'impostazione predefinita per partitionOverwriteMode è static.

Importante

Verificare che i dati scritti con partition dinamici sovrascrivono solo le partizioni previste. Una singola riga errata nell'partition può portare alla sovrascrittura involontaria di un intero partition.

L'esempio seguente dimostra l'uso di sovrascritture dinamiche partition:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Nota

  • La sovrascrittura dinamica di partition è in conflitto con l'opzione replaceWhere per i tablespartizionati.
    • Se la sovrascrittura partition dinamica è abilitata nella configurazione della sessione Spark e replaceWhere viene fornita come opzione DataFrameWriter, Delta Lake sovrascrive i dati in base all'espressione replaceWhere (le opzioni specifiche delle query sostituiscono le configurazioni della sessione).
    • Viene visualizzato un errore se le opzioni di DataFrameWriter hanno entrambe la sovrascrittura dinamica di partition e replaceWhere abilitate.
  • Non è possibile specificare overwriteSchema come true quando si usa la sovrascrittura dinamica di partition.