Condividi tramite


Esecuzione di query adattive

L'esecuzione di query adattive (AQE) è la ri-ottimizzazione delle query che si verifica durante la loro esecuzione.

La motivazione per la riottimizzazione del runtime è che Azure Databricks ha le statistiche più accurate up-to-date alla fine di uno scambio casuale e broadcast (detto fase di query in AQE). Di conseguenza, Azure Databricks può scegliere una strategia fisica migliore, individuare una dimensione ottimale e un numero di partizioni post-shuffle, oppure eseguire ottimizzazioni che in precedenza richiedevano suggerimenti, come ad esempio la gestione dei join sbilanciati.

Ciò può essere molto utile quando la raccolta delle statistiche non è attivata o quando le statistiche non sono aggiornate. È utile anche in contesti in cui le statistiche derivate in modo statico sono imprecise, ad esempio nel mezzo di una query complessa o dopo l'occorrenza di distorsioni dei dati.

Funzionalità

AQE è abilitato per impostazione predefinita. Ha 4 funzionalità principali:

  • Modifica dinamicamente l'ordinamento del merge join in broadcast hash join.
  • Unisce dinamicamente le partizioni (combina partizioni di piccole dimensioni in partizioni di dimensioni ragionevoli) dopo lo scambio di shuffle. Le attività molto piccole hanno una velocità effettiva di I/O peggiore e tendono a soffrire di più dal sovraccarico di pianificazione e dal sovraccarico di configurazione delle attività. La combinazione di attività di piccole dimensioni consente di risparmiare risorse e migliorare la velocità effettiva del cluster.
  • Gestisce dinamicamente lo sbilanciamento nel join di tipo merge sort e nel join di tipo hash shuffle suddividendo (e replicando, se necessario) le attività sbilanciate in attività di dimensioni approssimativamente uguali.
  • Rileva e propaga dinamicamente le relazioni vuote.

Applicazione

AQE si applica a tutte le interrogazioni che sono:

  • Non streaming
  • Contiene almeno uno scambio (in genere quando è presente un join, un'aggregazione o una finestra), una sottoquery o entrambe le cose.

Non tutte le query applicate all'AQE sono necessariamente riottimizzate. La riottimizzazione potrebbe o meno fornire un piano di query diverso rispetto a quello compilato staticamente. Per determinare se il piano di una query è stato modificato da AQE, consultare la sezione sottostante Piani di query.

Piani di query

In questa sezione viene illustrato come esaminare i piani di query in modi diversi.

In questa sezione:

Interfaccia utente di Spark

nodo AdaptiveSparkPlan

Le query applicate AQE contengono uno o più nodi AdaptiveSparkPlan, solitamente come nodo radice di ciascuna query principale o sottoquery. Prima dell'esecuzione della query o quando è in esecuzione, il flag isFinalPlan del nodo AdaptiveSparkPlan corrispondente viene visualizzato come false; al termine dell'esecuzione della query, il flag di isFinalPlan cambia in true.

Piano in evoluzione

Il diagramma del piano di query si evolve man mano che l'esecuzione avanza e riflette il piano più recente in esecuzione. I nodi che sono già stati eseguiti (in cui sono disponibili le metriche) non cambiano, ma quelli che non sono stati eseguiti possono cambiare nel tempo a seguito di riottimizzazioni.

Di seguito è riportato un esempio di diagramma del piano di query:

diagramma dello schema di query

DataFrame.explain()

nodo AdaptiveSparkPlan

Le query applicate AQE contengono uno o più nodi AdaptiveSparkPlan, solitamente come nodo radice di ciascuna query principale o sottoquery. Prima dell'esecuzione della query o quando è in esecuzione, il flag isFinalPlan del nodo AdaptiveSparkPlan corrispondente viene visualizzato come false; Al termine dell'esecuzione della query, il flag di isFinalPlan cambia in true.

Piano corrente e iniziale

In ogni nodo AdaptiveSparkPlan sarà presente sia il piano iniziale (il piano prima di applicare le ottimizzazioni AQE) sia il piano corrente o finale, a seconda che l'esecuzione sia stata completata. Il piano corrente si evolverà man mano che l'esecuzione procede.

Statistiche di esecuzione

Ogni fase di miscelazione e broadcast contiene statistiche sui dati.

Prima o durante l'esecuzione della fase, le statistiche sono stime calcolate in fase di compilazione e il flag isRuntime è false, ad esempio: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Una volta completata l'esecuzione della fase, le statistiche sono quelle raccolte durante l'esecuzione, e il flag isRuntime diventerà true, ad esempio: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Di seguito è riportato un esempio di DataFrame.explain:

  • Prima dell'esecuzione

    Prima dell'esecuzione

  • Durante l'esecuzione

    Durante l'esecuzione

  • Dopo l'esecuzione

    Dopo l'esecuzione

SQL EXPLAIN

nodo AdaptiveSparkPlan

Le query AQE applicate contengono uno o più nodi AdaptiveSparkPlan, di solito come nodo principale di ogni query principale o sottoquery.

Nessun piano attuale

Poiché SQL EXPLAIN non esegue la query, il piano corrente è sempre uguale al piano iniziale e non riflette ciò che alla fine verrebbe eseguito da AQE.

Di seguito è riportato un esempio di SQL explain:

SQL explain

Efficacia

Il piano di query cambierà se una o più ottimizzazioni di AQE diventano effettive. L'effetto di queste ottimizzazioni AQE è dimostrato dalla differenza tra i piani correnti e finali e il piano iniziale e i nodi di piano specifici nei piani correnti e finali.

  • Modificare dinamicamente l'unione di ordinamento in broadcast hash join: nodi di join fisici diversi tra il piano corrente/finale e il piano iniziale

    stringa della strategia di join

  • Unire dinamicamente le partizioni: nodo CustomShuffleReader con proprietà Coalesced

    lettore shuffle personalizzato

    stringa lettore casuale personalizzata

  • Gestire dinamicamente lo skew join: nodo SortMergeJoin con campo isSkew come vero.

    piano di join asimmetria

    stringa di join asimmetria

  • Rilevare e propagare dinamicamente le relazioni vuote: parte di (o dell'intero) piano viene sostituita dal nodo LocalTableScan con il campo delle relazioni vuoto.

    analisi della tabella locale

    stringa di scansione locale della tabella

Configurazione

In questa sezione:

Abilitare e disabilitare l'esecuzione di query adattive

Proprietà
spark.databricks.optimizer.adaptive.enabled

Tipo: Boolean

Se abilitare o disabilitare l'esecuzione di query adattive.

Valore predefinito: true

Abilitare il mescolamento ottimizzato automaticamente

Proprietà
spark.sql.shuffle.partitions

Tipo: Integer

Numero predefinito di partizioni da usare per il mescolamento dei dati per join o aggregazioni. L'impostazione del valore auto abilita lo shuffle ottimizzato automaticamente, che determina automaticamente questo numero in base al piano di query e alle dimensioni dei dati di input della query.

Nota: Per Structured Streaming, questa configurazione non può essere modificata tra i riavvii della query dalla stessa posizione del checkpoint.

Valore predefinito: 200

Modificare dinamicamente l'unione di ordinamento in broadcast hash join

Proprietà
spark.databricks.adaptive.autoBroadcastJoinThreshold

Tipo: Byte String

Soglia per attivare il passaggio al join di trasmissione in fase di esecuzione.

Valore predefinito: 30MB

Unire dinamicamente le partizioni

Proprietà
spark.sql.adaptive.coalescePartitions.enabled

Tipo: Boolean

Se abilitare o disabilitare la coalescenza delle partizioni.

Valore predefinito: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Tipo: Byte String

Dimensione obiettivo dopo la fusione. Le dimensioni delle partizioni unite saranno vicine, ma non superiori a quella dimensione obiettivo.

Valore predefinito: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Tipo: Byte String

Dimensioni minime delle partizioni dopo l'unione. Le dimensioni delle partizioni unite non saranno inferiori a questa grandezza.

Valore predefinito: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Tipo: Integer

Numero minimo di partizioni dopo l'unione. Non consigliato, perché l'impostazione sostituisce esplicitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valore predefinito: 2x numero di core del cluster

Gestire dinamicamente il join sbilanciato

Proprietà
spark.sql.adaptive.skewJoin.enabled

Tipo: Boolean

Se abilitare o disabilitare la gestione dell'asimmetria dei join.

Valore predefinito: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Tipo: Integer

Fattore che, se moltiplicato per la dimensione mediana della partizione, contribuisce a determinare se una partizione è sbilanciata.

Valore predefinito: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Tipo: Byte String

Soglia che contribuisce a determinare se una partizione è asimmetrica.

Valore predefinito: 256MB

Una partizione viene considerata asimmetrica quando sia (partition size > skewedPartitionFactor * median partition size) che (partition size > skewedPartitionThresholdInBytes) sono true.

Rilevare e propagare dinamicamente le relazioni vuote

Proprietà
spark.databricks.adaptive.emptyRelationPropagation.enabled

Tipo: Boolean

Se abilitare o disabilitare la propagazione dinamica delle relazioni vuote.

Valore predefinito: true

Domande frequenti

In questa sezione:

Perché AQE non ha trasmesso una piccola tabella join?

Se la dimensione della relazione prevista per la trasmissione scende al di sotto di questa soglia, ma non è ancora trasmessa:

  • Controllare il tipo di join. La trasmissione non è supportata per determinati tipi di join, ad esempio la relazione sinistra di un LEFT OUTER JOIN non può essere trasmessa.
  • Può anche essere che la relazione contenga molte partizioni vuote, nel qual caso la maggior parte delle attività può terminare rapidamente con il join di ordinamento e fusione o può essere potenzialmente ottimizzata con la gestione dello skew join. AQE evita di modificare tali merge join di ordinamento per trasmettere hash join se la percentuale di partizioni non vuote è inferiore a spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Dovrei ancora usare un hint per la strategia di join broadcast con AQE abilitato?

Sì. Un join broadcast pianificato in modo statico è generalmente più efficiente di uno pianificato dinamicamente da AQE, poiché AQE potrebbe non passare al join broadcast fino a dopo aver eseguito lo shuffle per entrambi i lati del join, momento in cui le dimensioni effettive delle relazioni sono ottenute. Pertanto, l'uso di un suggerimento di trasmissione può comunque essere una buona scelta se si conosce bene la propria query. AQE rispetterà i suggerimenti di query allo stesso modo dell'ottimizzazione statica, ma può comunque applicare ottimizzazioni dinamiche non influenzate dai suggerimenti.

Qual è la differenza tra il suggerimento per il join disallineato e l'ottimizzazione del join disallineato AQE? Quale dovrei usare?

È consigliabile basarsi sulla gestione del join asimmetrico di AQE anziché usare l'hint di join asimmetrico, perché il join asimmetrico di AQE è completamente automatico e in generale offre prestazioni migliori rispetto alla controparte hint.

Perché AQE non ha modificato automaticamente l'ordine dei join?

Il riordinamento del join dinamico non fa parte di AQE.

Perché AQE non ha rilevato un'asimmetria dei dati?

Esistono due condizioni di dimensione che devono essere soddisfatte affinché AQE rilevi una partizione come partizione asimmetrica:

  • Le dimensioni della partizione sono maggiori del spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (impostazione predefinita 256 MB)
  • La dimensione della partizione è maggiore della dimensione mediana di tutte le partizioni moltiplicata per il fattore di partizione asimmetrico spark.sql.adaptive.skewJoin.skewedPartitionFactor (impostazione predefinita 5)

Inoltre, il supporto della gestione delle asimmetrie è limitato per determinati tipi di join; ad esempio, in LEFT OUTER JOIN, è possibile ottimizzare solo l'asimmetria sul lato sinistro.

Eredità

Il termine "Esecuzione adattiva" esiste da Spark 1.6, ma il nuovo AQE in Spark 3.0 è fondamentalmente diverso. In termini di funzionalità, Spark 1.6 esegue solo la parte "unione dinamica delle partizioni". In termini di architettura tecnica, il nuovo AQE è un framework di pianificazione dinamica e riorganizzazione delle query in base alle statistiche di runtime, che supporta un'ampia gamma di ottimizzazioni, ad esempio quelle descritte in questo articolo e può essere esteso per consentire un maggior numero di ottimizzazioni potenziali.