Esecuzione di query adattive
L'esecuzione di query adattive (AQE) è la ri-ottimizzazione delle query che si verifica durante la loro esecuzione.
La motivazione per la riottimizzazione del runtime è che Azure Databricks ha le statistiche più accurate up-to-date alla fine di uno scambio casuale e broadcast (detto fase di query in AQE). Di conseguenza, Azure Databricks può scegliere una strategia fisica migliore, individuare una dimensione ottimale e un numero di partizioni post-shuffle, oppure eseguire ottimizzazioni che in precedenza richiedevano suggerimenti, come ad esempio la gestione dei join sbilanciati.
Ciò può essere molto utile quando la raccolta delle statistiche non è attivata o quando le statistiche non sono aggiornate. È utile anche in contesti in cui le statistiche derivate in modo statico sono imprecise, ad esempio nel mezzo di una query complessa o dopo l'occorrenza di distorsioni dei dati.
Funzionalità
AQE è abilitato per impostazione predefinita. Ha 4 funzionalità principali:
- Modifica dinamicamente l'ordinamento del merge join in broadcast hash join.
- Unisce dinamicamente le partizioni (combina partizioni di piccole dimensioni in partizioni di dimensioni ragionevoli) dopo lo scambio di shuffle. Le attività molto piccole hanno una velocità effettiva di I/O peggiore e tendono a soffrire di più dal sovraccarico di pianificazione e dal sovraccarico di configurazione delle attività. La combinazione di attività di piccole dimensioni consente di risparmiare risorse e migliorare la velocità effettiva del cluster.
- Gestisce dinamicamente lo sbilanciamento nel join di tipo merge sort e nel join di tipo hash shuffle suddividendo (e replicando, se necessario) le attività sbilanciate in attività di dimensioni approssimativamente uguali.
- Rileva e propaga dinamicamente le relazioni vuote.
Applicazione
AQE si applica a tutte le interrogazioni che sono:
- Non streaming
- Contiene almeno uno scambio (in genere quando è presente un join, un'aggregazione o una finestra), una sottoquery o entrambe le cose.
Non tutte le query applicate all'AQE sono necessariamente riottimizzate. La riottimizzazione potrebbe o meno fornire un piano di query diverso rispetto a quello compilato staticamente. Per determinare se il piano di una query è stato modificato da AQE, consultare la sezione sottostante Piani di query.
Piani di query
In questa sezione viene illustrato come esaminare i piani di query in modi diversi.
In questa sezione:
Interfaccia utente di Spark
nodo AdaptiveSparkPlan
Le query applicate AQE contengono uno o più nodi AdaptiveSparkPlan
, solitamente come nodo radice di ciascuna query principale o sottoquery.
Prima dell'esecuzione della query o quando è in esecuzione, il flag isFinalPlan
del nodo AdaptiveSparkPlan
corrispondente viene visualizzato come false
; al termine dell'esecuzione della query, il flag di isFinalPlan
cambia in true.
Piano in evoluzione
Il diagramma del piano di query si evolve man mano che l'esecuzione avanza e riflette il piano più recente in esecuzione. I nodi che sono già stati eseguiti (in cui sono disponibili le metriche) non cambiano, ma quelli che non sono stati eseguiti possono cambiare nel tempo a seguito di riottimizzazioni.
Di seguito è riportato un esempio di diagramma del piano di query:
DataFrame.explain()
nodo AdaptiveSparkPlan
Le query applicate AQE contengono uno o più nodi AdaptiveSparkPlan
, solitamente come nodo radice di ciascuna query principale o sottoquery. Prima dell'esecuzione della query o quando è in esecuzione, il flag isFinalPlan
del nodo AdaptiveSparkPlan
corrispondente viene visualizzato come false
; Al termine dell'esecuzione della query, il flag di isFinalPlan
cambia in true
.
Piano corrente e iniziale
In ogni nodo AdaptiveSparkPlan
sarà presente sia il piano iniziale (il piano prima di applicare le ottimizzazioni AQE) sia il piano corrente o finale, a seconda che l'esecuzione sia stata completata. Il piano corrente si evolverà man mano che l'esecuzione procede.
Statistiche di esecuzione
Ogni fase di miscelazione e broadcast contiene statistiche sui dati.
Prima o durante l'esecuzione della fase, le statistiche sono stime calcolate in fase di compilazione e il flag isRuntime
è false
, ad esempio: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Una volta completata l'esecuzione della fase, le statistiche sono quelle raccolte durante l'esecuzione, e il flag isRuntime
diventerà true
, ad esempio: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
Di seguito è riportato un esempio di DataFrame.explain
:
Prima dell'esecuzione
Durante l'esecuzione
Dopo l'esecuzione
SQL EXPLAIN
nodo AdaptiveSparkPlan
Le query AQE applicate contengono uno o più nodi AdaptiveSparkPlan, di solito come nodo principale di ogni query principale o sottoquery.
Nessun piano attuale
Poiché SQL EXPLAIN
non esegue la query, il piano corrente è sempre uguale al piano iniziale e non riflette ciò che alla fine verrebbe eseguito da AQE.
Di seguito è riportato un esempio di SQL explain:
Efficacia
Il piano di query cambierà se una o più ottimizzazioni di AQE diventano effettive. L'effetto di queste ottimizzazioni AQE è dimostrato dalla differenza tra i piani correnti e finali e il piano iniziale e i nodi di piano specifici nei piani correnti e finali.
Modificare dinamicamente l'unione di ordinamento in broadcast hash join: nodi di join fisici diversi tra il piano corrente/finale e il piano iniziale
Unire dinamicamente le partizioni: nodo
CustomShuffleReader
con proprietàCoalesced
Gestire dinamicamente lo skew join: nodo
SortMergeJoin
con campoisSkew
come vero.Rilevare e propagare dinamicamente le relazioni vuote: parte di (o dell'intero) piano viene sostituita dal nodo LocalTableScan con il campo delle relazioni vuoto.
Configurazione
In questa sezione:
- Abilitare e disabilitare l'esecuzione di query adattive
- Abilita la miscelazione ottimizzata automaticamente
- Modificare dinamicamente il join di unione di ordinamento in un join hash broadcast
- unire dinamicamente le partizioni
- gestire dinamicamente l'asimmetria di join
- rilevare e propagare dinamicamente le relazioni vuote
Abilitare e disabilitare l'esecuzione di query adattive
Proprietà |
---|
spark.databricks.optimizer.adaptive.enabled Tipo: Boolean Se abilitare o disabilitare l'esecuzione di query adattive. Valore predefinito: true |
Abilitare il mescolamento ottimizzato automaticamente
Proprietà |
---|
Tipo: Integer Numero predefinito di partizioni da usare per il mescolamento dei dati per join o aggregazioni. L'impostazione del valore auto abilita lo shuffle ottimizzato automaticamente, che determina automaticamente questo numero in base al piano di query e alle dimensioni dei dati di input della query.Nota: Per Structured Streaming, questa configurazione non può essere modificata tra i riavvii della query dalla stessa posizione del checkpoint. Valore predefinito: 200 |
Modificare dinamicamente l'unione di ordinamento in broadcast hash join
Proprietà |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Tipo: Byte String Soglia per attivare il passaggio al join di trasmissione in fase di esecuzione. Valore predefinito: 30MB |
Unire dinamicamente le partizioni
Proprietà |
---|
Tipo: Boolean Se abilitare o disabilitare la coalescenza delle partizioni. Valore predefinito: true |
Tipo: Byte String Dimensione obiettivo dopo la fusione. Le dimensioni delle partizioni unite saranno vicine, ma non superiori a quella dimensione obiettivo. Valore predefinito: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Tipo: Byte String Dimensioni minime delle partizioni dopo l'unione. Le dimensioni delle partizioni unite non saranno inferiori a questa grandezza. Valore predefinito: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Tipo: Integer Numero minimo di partizioni dopo l'unione. Non consigliato, perché l'impostazione sostituisce esplicitamente spark.sql.adaptive.coalescePartitions.minPartitionSize .Valore predefinito: 2x numero di core del cluster |
Gestire dinamicamente il join sbilanciato
Proprietà |
---|
spark.sql.adaptive.skewJoin.enabled Tipo: Boolean Se abilitare o disabilitare la gestione dell'asimmetria dei join. Valore predefinito: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Tipo: Integer Fattore che, se moltiplicato per la dimensione mediana della partizione, contribuisce a determinare se una partizione è sbilanciata. Valore predefinito: 5 |
Tipo: Byte String Soglia che contribuisce a determinare se una partizione è asimmetrica. Valore predefinito: 256MB |
Una partizione viene considerata asimmetrica quando sia (partition size > skewedPartitionFactor * median partition size)
che (partition size > skewedPartitionThresholdInBytes)
sono true
.
Rilevare e propagare dinamicamente le relazioni vuote
Proprietà |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Tipo: Boolean Se abilitare o disabilitare la propagazione dinamica delle relazioni vuote. Valore predefinito: true |
Domande frequenti
In questa sezione:
- Perché AQE non ha diffuso una piccola tabella di join?
- È consigliabile usare comunque un hint per la strategia di join broadcast con AQE abilitato?
- Qual è la differenza tra il suggerimento per il join disallineato e l'ottimizzazione del join disallineato AQE? Quale dovrei usare?
- Perché AQE non ha modificato automaticamente l'ordinamento dei join?
- Perché AQE non ha rilevato l'asimmetria dei dati?
Perché AQE non ha trasmesso una piccola tabella join?
Se la dimensione della relazione prevista per la trasmissione scende al di sotto di questa soglia, ma non è ancora trasmessa:
- Controllare il tipo di join. La trasmissione non è supportata per determinati tipi di join, ad esempio la relazione sinistra di un
LEFT OUTER JOIN
non può essere trasmessa. - Può anche essere che la relazione contenga molte partizioni vuote, nel qual caso la maggior parte delle attività può terminare rapidamente con il join di ordinamento e fusione o può essere potenzialmente ottimizzata con la gestione dello skew join. AQE evita di modificare tali merge join di ordinamento per trasmettere hash join se la percentuale di partizioni non vuote è inferiore a
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Dovrei ancora usare un hint per la strategia di join broadcast con AQE abilitato?
Sì. Un join broadcast pianificato in modo statico è generalmente più efficiente di uno pianificato dinamicamente da AQE, poiché AQE potrebbe non passare al join broadcast fino a dopo aver eseguito lo shuffle per entrambi i lati del join, momento in cui le dimensioni effettive delle relazioni sono ottenute. Pertanto, l'uso di un suggerimento di trasmissione può comunque essere una buona scelta se si conosce bene la propria query. AQE rispetterà i suggerimenti di query allo stesso modo dell'ottimizzazione statica, ma può comunque applicare ottimizzazioni dinamiche non influenzate dai suggerimenti.
Qual è la differenza tra il suggerimento per il join disallineato e l'ottimizzazione del join disallineato AQE? Quale dovrei usare?
È consigliabile basarsi sulla gestione del join asimmetrico di AQE anziché usare l'hint di join asimmetrico, perché il join asimmetrico di AQE è completamente automatico e in generale offre prestazioni migliori rispetto alla controparte hint.
Perché AQE non ha modificato automaticamente l'ordine dei join?
Il riordinamento del join dinamico non fa parte di AQE.
Perché AQE non ha rilevato un'asimmetria dei dati?
Esistono due condizioni di dimensione che devono essere soddisfatte affinché AQE rilevi una partizione come partizione asimmetrica:
- Le dimensioni della partizione sono maggiori del
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(impostazione predefinita 256 MB) - La dimensione della partizione è maggiore della dimensione mediana di tutte le partizioni moltiplicata per il fattore di partizione asimmetrico
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(impostazione predefinita 5)
Inoltre, il supporto della gestione delle asimmetrie è limitato per determinati tipi di join; ad esempio, in LEFT OUTER JOIN
, è possibile ottimizzare solo l'asimmetria sul lato sinistro.
Eredità
Il termine "Esecuzione adattiva" esiste da Spark 1.6, ma il nuovo AQE in Spark 3.0 è fondamentalmente diverso. In termini di funzionalità, Spark 1.6 esegue solo la parte "unione dinamica delle partizioni". In termini di architettura tecnica, il nuovo AQE è un framework di pianificazione dinamica e riorganizzazione delle query in base alle statistiche di runtime, che supporta un'ampia gamma di ottimizzazioni, ad esempio quelle descritte in questo articolo e può essere esteso per consentire un maggior numero di ottimizzazioni potenziali.