Condividi tramite


Procedure consigliate per Delta Lake

Questo articolo descrive la procedura consigliata per l'uso di Delta Lake.

Databricks consiglia di usare l'ottimizzazione predittiva. Vedere Ottimizzazione predittiva per le tabelle gestite di Unity Catalog.

Se si elimina e si ricrea una tabella nella stessa posizione, è consigliabile usare sempre un'istruzione CREATE OR REPLACE TABLE. Vedere Escludere o sostituire una tabella Delta.

Rimuovere le configurazioni Delta legacy

Durante l'aggiornamento a una nuova versione di Databricks Runtime, Databricks consiglia di rimuovere le configurazioni Delta legacy più esplicite dalle configurazioni Spark e dalle proprietà della tabella. Le configurazioni legacy possono impedire l'applicazione di nuove ottimizzazioni e i valori predefiniti introdotti da Databricks ai carichi di lavoro migrati.

Usare il clustering liquido per ignorare i dati ottimizzati

Databricks consiglia di usare il clustering liquido anziché il partizionamento, l'ordine Z o altre strategie dell'organizzazione dei dati per ottimizzare il layout dei dati e ignorare i dati. Vedere Usare il clustering liquido per le tabelle Delta.

Compattare file

L'ottimizzazione predittiva esegue automaticamente OPTIMIZE e i comandi VACUUM sulle tabelle gestite di Unity Catalog. Vedere Ottimizzazione predittiva per le tabelle gestite di Unity Catalog.

Databricks consiglia di eseguire frequentemente il comando OPTIMIZE per compattare i file di piccole dimensioni.

Nota

Questa operazione non rimuove i file precedenti. Per rimuoverli, eseguire il comando VACUUM.

Sostituire il contenuto o lo schema di una tabella

In alcuni casi può essere necessario sostituire una tabella Delta. Ad esempio:

  • Si scopre che dati nella tabella non sono corretti e si desidera sostituire il contenuto.
  • Si vuole riscrivere l'intera tabella per apportare modifiche di schema incompatibili (ad esempio la modifica dei tipi di colonna).

Anche se è possibile eliminare l'intera directory di una tabella Delta e crearne una nuova nello stesso percorso, questo non è consigliabile perché:

  • L'eliminazione di una directory non è efficiente. L'eliminazione di una directory contenente file di grandi dimensioni può richiedere ore o persino giorni.
  • Si perde tutto il contenuto dei file eliminati ed è difficile eseguire il recupero se si elimina la tabella errata.
  • L'eliminazione della directory non è atomica. Durante l'eliminazione della tabella una query simultanea che legge la tabella può non riuscire oppure può visualizzare una tabella parziale.

Se non è necessario modificare lo schema della tabella, è possibile eliminare i dati da una tabella Delta e inserire i nuovi dati. In alternativa aggiornare la tabella per correggere i valori non corretti.

Se si desidera modificare lo schema della tabella, è possibile sostituire l'intera tabella in modo atomico. Ad esempio:

Python

dataframe.write \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> AS SELECT ... -- Managed table
REPLACE TABLE <your-table> LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Questo approccio offre diversi vantaggi:

  • La sovrascrittura di una tabella è molto più veloce perché non è necessario elencare la directory in modo ricorsivo o eliminare file.
  • La versione precedente della tabella esiste ancora. Se si elimina la tabella errata, è possibile recuperare facilmente i vecchi dati usando il tempo per spostamento fisico. Vedere Usare la cronologia delle tabelle Delta Lake.
  • È un'operazione atomica. Le query simultanee possono comunque leggere la tabella durante la sua eliminazione.
  • A causa delle garanzie di transazione Delta Lake ACID, se la sovrascrittura della tabella ha esito negativo, la tabella si troverà nello stato precedente.

Inoltre, se si desidera eliminare i file obsoleti per risparmiare sui costi di archiviazione dopo la sovrascrittura della tabella, è possibile usare VACUUM per eliminarli. È ottimizzato per l'eliminazione di file ed è in genere più veloce rispetto all'eliminazione dell'intera directory.

Memorizzazione nella cache di Spark

Databricks non consiglia di usare la memorizzazione nella cache di Spark per i motivi seguenti:

  • Si perde la possibilità di ignorare i dati che possono provenire da ulteriori filtri aggiunti all'oggetto DataFrame nella cache.
  • I dati memorizzati nella cache potrebbero non essere aggiornati se si accede alla tabella usando un identificatore diverso.

Differenze tra Delta Lake e Parquet in Apache Spark

Delta Lake gestisce automaticamente le operazioni seguenti. È consigliabile non eseguire mai queste operazioni manualmente:

  • REFRESH TABLE: le tabelle Delta restituiscono sempre le informazioni più aggiornate, quindi non è necessario chiamare manualmente REFRESH TABLE dopo le modifiche.
  • Aggiungere e rimuovere partizioni: Delta Lake tiene traccia automaticamente del set di partizioni presenti in una tabella e aggiorna l'elenco man mano che i dati vengono aggiunti o rimossi. Di conseguenza, non è necessario eseguire ALTER TABLE [ADD|DROP] PARTITION o MSCK.
  • Caricare una singola partizione: la lettura diretta delle partizioni non è necessaria. Ad esempio, non è necessario eseguire spark.read.format("parquet").load("/data/date=2017-01-01"). Usare invece una clausola WHERE per ignorare i dati, ad esempio spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Non modificare manualmente i file di dati: Delta Lake usa il log delle transazioni per eseguire il commit delle modifiche apportate alla tabella in modo atomico. Non modificare, aggiungere o eliminare direttamente i file di dati Parquet in una tabella Delta, perché questo potrebbe causare la perdita di dati o il danneggiamento della tabella.

Migliorare le prestazioni per il merge in Delta Lake

È possibile ridurre il tempo impiegato dal merge usando i seguenti approcci:

  • Ridurre lo spazio di ricerca per le corrispondenze: per impostazione predefinita, l'operazione merge esegue una ricerca nell'intera tabella Delta per trovare le corrispondenze nella tabella di origine. Un modo per velocizzare merge è ridurre lo spazio di ricerca aggiungendo vincoli noti nella condizione di corrispondenza. Si supponga, ad esempio, di avere una tabella partizionata da country e date, e si vuole usare merge per aggiornare le informazioni relative all'ultimo giorno e a un paese specifico. L'aggiunta della condizione seguente rende la query più veloce, perché cerca le corrispondenze solo nelle partizioni pertinenti:

    events.date = current_date() AND events.country = 'USA'
    

    Inoltre, questa query riduce anche le probabilità di conflitti con altre operazioni simultanee. Per altri dettagli vedere Livelli di isolamento e conflitti di scrittura in Azure Databricks.

  • Compattare i file: se i dati vengono archiviati in molti file di piccole dimensioni, la lettura dei dati per la ricerca di corrispondenze può diventare lenta. È possibile compattare file di piccole dimensioni in file più grandi per migliorare la produttività della lettura. Per i dettagli, vedere Ottimizzare il layout del file di dati.

  • Controllare le partizioni in ordine casuale per le scritture: l'operazione merge seleziona in ordine casuale più volte i dati nel calcolo e scrive quelli aggiornati. Il numero di task usate per la selezione in ordine casuale è controllato dalla configurazione spark.sql.shuffle.partitions della sessione Spark. L'impostazione di questo parametro non solo controlla il parallelismo, ma determina anche il numero di file di output. L'aumento del valore aumenta il parallelismo, ma genera anche un numero maggiore di file di dati più piccoli.

  • Abilitare le scritture ottimizzate: per le tabelle partizionate, merge può produrre un numero molto grande di file di piccole dimensioni rispetto al numero di partizioni casuali. Questo è dovuto al fatto che ogni task casuale può scrivere più file in più partizioni e può diventare un collo di bottiglia delle prestazioni. È possibile ridurre il numero di file abilitando le scritture ottimizzate. Si veda Scritture ottimizzate per Delta Lake in Azure Databricks.

  • Ottimizzare le dimensioni dei file nella tabella: Azure Databricks può rilevare automaticamente se una tabella Delta ha operazioni merge frequenti che riscrivono i file, e può scegliere di ridurre le dimensioni dei file riscritti in previsione di ulteriori riscritture dei file in futuro. Per ii dettagli, vedere la sezione ottimizzazione delle dimensioni del file.

  • Unione in ordine casuale basso: Unione in ordine casuale basso offre un'implementazione ottimizzata di MERGE, con prestazioni migliori per i carichi di lavoro più comuni. Inoltre, mantiene le ottimizzazioni del layout dei dati esistenti, ad esempio ordine Z sui dati non modificati.

Gestire il recency dei dati

All'inizio di ogni query, le tabelle Delta vengono aggiornate automaticamente all'ultima versione della tabella. Questo lavoro può essere osservato nei notebook quando lo stato del comando segnala: Updating the Delta table's state. Tuttavia, quando si esegue l'analisi cronologica in una tabella, potrebbero non essere necessari i dati aggiornati all'ultimo minuto, soprattutto per le tabelle in cui i dati di streaming vengono inseriti di frequente. In questi casi, le query possono essere eseguite sugli snapshot non aggiornati della tabella Delta. Questo approccio può ridurre la latenza nell'ottenere risultati dalle query.

È possibile configurare la tolleranza per i dati non aggiornati impostando la configurazione spark.databricks.delta.stalenessLimit della sessione Spark con un valore stringa di tempo, ad esempio 1h o 15m (rispettivamente per 1 ora o 15 minuti). Questa configurazione è specifica della sessione e non influisce sugli altri accessi client alla tabella. Se lo stato della tabella è stato aggiornato entro il limite di decadimento, una query sulla tabella restituisce i risultati senza attendere l'aggiornamento della tabella più recente. Questa impostazione non impedisce mai l'aggiornamento della tabella e, quando vengono restituiti dati non aggiornati, i processi di aggiornamento in background. Se l'ultimo aggiornamento della tabella è antecedente al limite di decadimento, la query non restituisce risultati fino al completamento dell'aggiornamento di stato della tabella.

Checkpoint avanzati per query a bassa latenza

Delta Lake scrive i checkpoint come stato aggregato di una tabella Delta a una frequenza ottimizzata. Questi checkpoint fungono da punto iniziale per calcolare lo stato più recente della tabella. Senza checkpoint, Delta Lake dovrà leggere un'ampia raccolta di file JSON (file "delta") che rappresenta i commit nel log delle transazioni per calcolare lo stato di una tabella. Inoltre, le statistiche a livello di colonna usate da Delta Lake per ignorare i dati vengono archiviate nel checkpoint.

Importante

I checkpoint Delta Lake sono diversi dai checkpoint di flusso strutturato. Vedere Checkpoint di flusso strutturato.

Le statistiche a livello di colonna vengono archiviate come struttura e JSON (per la compatibilità con le versioni precedenti). Il formato della struttura rende le letture Delta Lake molto più veloci, perché:

  • Delta Lake non esegue una dispendiosa analisi JSON per ottenere statistiche a livello di colonna.
  • Le funzionalità di eliminazione di colonna Parquet riducono significativamente le operazioni di I/O necessarie per leggere le statistiche per una colonna.

Il formato struttura consente una raccolta di ottimizzazioni che riducono il sovraccarico delle operazioni di lettura Delta Lake da secondi a decine di millisecondi, riducendo significativamente la latenza per le query brevi.

Gestire le statistiche a livello di colonna nei checkpoint

È possibile gestire la modalità di scrittura delle statistiche nei checkpoint usando le proprietà della tabella delta.checkpoint.writeStatsAsJson e delta.checkpoint.writeStatsAsStruct. Se entrambe le proprietà della tabella sono false, Delta Lake non può ignorare i dati sulle prestazioni.

  • Il batch scrive le statistiche di scrittura sia in formato JSON che in formato struttura. delta.checkpoint.writeStatsAsJson è .true
  • delta.checkpoint.writeStatsAsStruct è non definito per impostazione predefinita.
  • I lettori usano la colonna struttura, quando disponibile, e in caso contrario ritornano all'uso della colonna JSON.

Importante

I checkpoint avanzati non interrompono la compatibilità con i lettori Delta Lake open source. Tuttavia, l'impostazione delta.checkpoint.writeStatsAsJson per false può avere implicazioni sui lettori Delta Lake proprietari. Per altre informazioni relative alle implicazioni sulle prestazioni, contattare i fornitori di riferimento.

Abilitare checkpoint avanzati per le query di flusso strutturato

Se i carichi di lavoro di flusso strutturato non hanno requisiti di bassa latenza (latenze secondarie), è possibile abilitare checkpoint avanzati eseguendo il comando SQL seguente:

ALTER TABLE <table-name> SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

È anche possibile migliorare la latenza di scrittura del checkpoint impostando le proprietà della tabella seguenti:

ALTER TABLE <table-name> SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Se ignorare i dati non è utile, nell'applicazione, è possibile impostare entrambe le proprietà su false. Quindi non vengono raccolte o scritte statistiche. Databricks non consiglia questa configurazione.