Adaptieve queryuitvoering
Adaptieve queryuitvoering (AQE) is heroptimalisatie van query's die wordt gedaan tijdens het uitvoeren van query's.
De motivatie voor het opnieuw optimaliseren van runtimes is dat Azure Databricks de meest actuele nauwkeurige statistieken heeft aan het einde van een shuffle- en broadcast-uitwisseling (ook wel een queryfase in AQE genoemd). Als gevolg hiervan kan Azure Databricks kiezen voor een betere fysieke strategie, een optimale partitiegrootte en -nummer na willekeurige volgorde kiezen of optimalisaties uitvoeren die worden gebruikt om hints te vereisen, bijvoorbeeld scheeftrekken van joinafhandeling.
Dit kan zeer nuttig zijn wanneer het verzamelen van statistieken niet is ingeschakeld of wanneer statistieken verouderd zijn. Het is ook handig op plaatsen waar statisch afgeleide statistieken onnauwkeurig zijn, zoals in het midden van een gecompliceerde query of na het voorkomen van scheeftrekken van gegevens.
Functies
AQE is standaard ingeschakeld. Het heeft vier belangrijke functies:
- Hiermee wijzigt u de samenvoegbewerking dynamisch in broadcast-hash-join.
- Hiermee worden partities dynamisch samengevoegd (kleine partities combineren tot redelijk grote partities) na een willekeurige uitwisseling. Zeer kleine taken hebben slechtere I/O-doorvoer en hebben meestal meer last van planningsoverhead en overhead voor het instellen van taken. Door kleine taken te combineren, worden resources opgeslagen en wordt de clusterdoorvoer verbeterd.
- Verwerkt dynamisch scheefheid in samenvoeging en hash-join in willekeurige volgorde door scheeftrekkende taken te splitsen (en indien nodig te repliceren) in ongeveer gelijkmatige taken.
- Automatisch worden lege relaties gedetecteerd en doorgegeven.
Aanvraag
AQE is van toepassing op alle query's die:
- Niet-streaming
- Bevatten ten minste één uitwisseling (meestal wanneer er een join, aggregaat of venster is), één subquery of beide.
Niet alle AQE-toegepaste query's zijn noodzakelijkerwijs opnieuw geoptimaliseerd. De heroptimalisatie kan al dan niet een ander queryplan opleveren dan het plan dat statisch is gecompileerd. Als u wilt bepalen of het plan van een query is gewijzigd door AQE, raadpleegt u de volgende sectie, Queryplannen.
Queryplannen
In deze sectie wordt beschreven hoe u queryplannen op verschillende manieren kunt onderzoeken.
In deze sectie:
Apache Spark-gebruikersinterface
AdaptiveSparkPlan
-knooppunt
AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan
knooppunten, meestal als hoofdknooppunt van elke hoofdquery of subquery.
Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan
vlag van het bijbehorende AdaptiveSparkPlan
knooppunt weergegeven als false
; nadat de uitvoering van de query is voltooid, wordt de isFinalPlan
vlag gewijzigd in true.
Ontwikkelend plan
Het queryplandiagram ontwikkelt zich naarmate de uitvoering vordert en weerspiegelt het meest recente plan dat wordt uitgevoerd. Knooppunten die al zijn uitgevoerd (waarin metrische gegevens beschikbaar zijn) worden niet gewijzigd, maar knooppunten die na verloop van tijd niet kunnen worden gewijzigd als gevolg van heroptimalisaties.
Hier volgt een voorbeeld van een queryplandiagram:
DataFrame.explain()
AdaptiveSparkPlan
-knooppunt
AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan
knooppunten, meestal als hoofdknooppunt van elke hoofdquery of subquery. Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan
vlag van het bijbehorende AdaptiveSparkPlan
knooppunt weergegeven als false
; nadat de uitvoering van de query is voltooid, wordt de isFinalPlan
vlag gewijzigd in true
.
Huidig en eerste plan
Onder elk AdaptiveSparkPlan
knooppunt zijn zowel het eerste plan (het plan voor het toepassen van AQE-optimalisaties) als het huidige of het uiteindelijke plan, afhankelijk van of de uitvoering is voltooid. Het huidige plan zal zich ontwikkelen naarmate de uitvoering vordert.
Runtimestatistieken
Elke shuffle- en broadcastfase bevat gegevensstatistieken.
Voordat de fase wordt uitgevoerd of wanneer de fase wordt uitgevoerd, zijn de statistieken schattingen van de compilatietijd en de vlag isRuntime
, false
bijvoorbeeld: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Nadat de uitvoering van de fase is voltooid, zijn de statistieken die tijdens runtime worden verzameld en wordt true
de vlag isRuntime
bijvoorbeeld:Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
Hier volgt een DataFrame.explain
voorbeeld:
Vóór de uitvoering
Tijdens de uitvoering
Na de uitvoering
SQL EXPLAIN
AdaptiveSparkPlan
-knooppunt
AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan-knooppunten, meestal als het hoofdknooppunt van elke hoofdquery of subquery.
Geen huidig abonnement
Omdat SQL EXPLAIN
de query niet wordt uitgevoerd, is het huidige plan altijd hetzelfde als het oorspronkelijke plan en wordt niet weergegeven wat uiteindelijk door AQE wordt uitgevoerd.
Hier volgt een voorbeeld van een SQL-uitleg:
Doeltreffendheid
Het queryplan wordt gewijzigd als een of meer AQE-optimalisaties van kracht worden. Het effect van deze AQE-optimalisaties wordt gedemonstreerd door het verschil tussen de huidige en definitieve plannen en het eerste plan en specifieke planknooppunten in de huidige en definitieve plannen.
De samenvoegbewerking dynamisch wijzigen in broadcast-hash-join: verschillende fysieke join-knooppunten tussen het huidige/definitieve plan en het eerste plan
Dynamisch samenvoegingspartities: knooppunt
CustomShuffleReader
met eigenschapCoalesced
Dynamisch afhandelen van scheefheidsdeelname: knooppunt
SortMergeJoin
met veldisSkew
als waar.Lege relaties dynamisch detecteren en doorgeven: een deel van (of geheel) het plan wordt vervangen door localTableScan van het knooppunt door het relationele veld als leeg.
Configuration
In deze sectie:
- Adaptieve queryuitvoering in- en uitschakelen
- Automatisch geoptimaliseerde shuffle inschakelen
- Samenvoegen samenvoegen dynamisch wijzigen in broadcast-hash-join
- Dynamisch samenvoegingspartities
- Scheefheidsdeelname dynamisch verwerken
- Lege relaties dynamisch detecteren en doorgeven
Adaptieve queryuitvoering in- en uitschakelen
Eigenschappen |
---|
spark.databricks.optimizer.adaptive.enabled Type: Boolean Of u adaptieve query's wilt in- of uitschakelen. Standaardwaarde: true |
Automatisch geoptimaliseerde shuffle inschakelen
Eigenschappen |
---|
spark.sql.shuffle.partitions Type: Integer Het standaardaantal partities dat moet worden gebruikt bij het opsnipperen van gegevens voor joins of aggregaties. Als u de waarde auto instelt, wordt automatisch geoptimaliseerde willekeurige volgorde ingeschakeld, waardoor dit aantal automatisch wordt bepaald op basis van het queryplan en de grootte van de queryinvoergegevens.Opmerking: Voor gestructureerd streamen kan deze configuratie niet worden gewijzigd tussen opnieuw opstarten van query's vanaf dezelfde controlepuntlocatie. Standaardwaarde: 200 |
Samenvoegen samenvoegen dynamisch wijzigen in broadcast-hash-join
Eigenschappen |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Type: Byte String De drempelwaarde voor het activeren van overschakelen naar broadcast-join tijdens runtime. Standaardwaarde: 30MB |
Dynamisch samenvoegingspartities
Eigenschappen |
---|
spark.sql.adaptive.coalescePartitions.enabled Type: Boolean Of partities samenvoegen moeten worden in- of uitgeschakeld. Standaardwaarde: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Type: Byte String De doelgrootte na het samenvoegen. De grootten van de gesamenceerde partities liggen dicht bij, maar niet groter dan deze doelgrootte. Standaardwaarde: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Type: Byte String De minimale grootte van partities na het samenvoegen. De grootten van de samengesamende partities zijn niet kleiner dan deze grootte. Standaardwaarde: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Type: Integer Het minimale aantal partities na het samenvoegen. Niet aanbevolen, omdat het instellen expliciet overschrijft spark.sql.adaptive.coalescePartitions.minPartitionSize .Standaardwaarde: 2x nee. van clusterkernen |
Scheefheidsdeelname dynamisch verwerken
Eigenschappen |
---|
spark.sql.adaptive.skewJoin.enabled Type: Boolean Of u de verwerking van scheefheidsdeelname wilt in- of uitschakelen. Standaardwaarde: true |
spark.sql.adaptive.skewJoin.skewEdPartitionFactor Type: Integer Een factor die bij vermenigvuldiging met de mediaanpartitiegrootte bijdraagt aan het bepalen of een partitie scheef is. Standaardwaarde: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Type: Byte String Een drempelwaarde die bijdraagt aan het bepalen of een partitie scheef is. Standaardwaarde: 256MB |
Een partitie wordt beschouwd als scheeftrekken wanneer beide (partition size > skewedPartitionFactor * median partition size)
en (partition size > skewedPartitionThresholdInBytes)
zijn true
.
Lege relaties dynamisch detecteren en doorgeven
Eigenschappen |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Type: Boolean Hiermee wordt aangegeven of dynamische, lege relationele doorgifte moet worden ingeschakeld of uitgeschakeld. Standaardwaarde: true |
Veelgestelde vragen
In deze sectie:
- Waarom heeft AQE geen kleine jointabel uitgezonden?
- Moet ik nog steeds een hint voor een broadcast-joinstrategie gebruiken waarvoor AQE is ingeschakeld?
- Wat is het verschil tussen scheefheidsdeelnamehint en optimalisatie van AQE-joins? Welke moet ik gebruiken?
- Waarom heeft AQE mijn joinvolgorde niet automatisch aangepast?
- Waarom heeft AQE mijn gegevens scheeftrekken niet gedetecteerd?
Waarom heeft AQE geen kleine jointabel uitgezonden?
Als de grootte van de verwachte relatie onder deze drempelwaarde valt, maar nog steeds niet wordt uitgezonden:
- Controleer het jointype. Broadcast wordt niet ondersteund voor bepaalde jointypen, bijvoorbeeld de linkerrelatie van een
LEFT OUTER JOIN
kan niet worden uitgezonden. - Het kan ook zijn dat de relatie veel lege partities bevat. In dat geval kan het merendeel van de taken snel worden voltooid met sort merge join of kan deze worden geoptimaliseerd met de verwerking van scheve joins. AQE vermijdt het wijzigen van dergelijke samenvoegingsdeelnames om hash-joins uit te zenden als het percentage niet-lege partities lager is dan
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Moet ik nog steeds een hint voor een broadcast-joinstrategie gebruiken waarvoor AQE is ingeschakeld?
Ja. Een statisch geplande broadcast-join is meestal beter presterend dan een dynamisch geplande join door AQE, omdat AQE mogelijk pas overschakelt naar broadcast-join nadat een willekeurige volgorde voor beide zijden van de join is uitgevoerd (waarbij de werkelijke relatiegrootten worden verkregen). Het gebruik van een broadcast-hint kan dus nog steeds een goede keuze zijn als u uw query goed kent. AQE respecteert queryhints op dezelfde manier als statische optimalisatie, maar kan dynamische optimalisaties toepassen die niet worden beïnvloed door de hints.
Wat is het verschil tussen scheefheidsdeelnamehint en optimalisatie van AQE-joins? Welke moet ik gebruiken?
Het wordt aanbevolen om te vertrouwen op AQE scheefheidsdeelnameverwerking in plaats van de scheefheidshint te gebruiken, omdat AQE-scheefheidsdeelname volledig automatisch is en in het algemeen beter presteert dan de hint-tegenhanger.
Waarom heeft AQE mijn joinvolgorde niet automatisch aangepast?
Dynamische joins opnieuw ordenen maakt geen deel uit van AQE.
Waarom heeft AQE mijn gegevens scheeftrekken niet gedetecteerd?
Er zijn twee groottevoorwaarden waaraan moet worden voldaan voor AQE om een partitie te detecteren als een scheve partitie:
- De partitiegrootte is groter dan de
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(standaard 256 MB) - De partitiegrootte is groter dan de mediaangrootte van alle partities en de scheve partitiefactor
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(standaard 5)
Bovendien is de ondersteuning voor scheefheidsafhandeling beperkt voor bepaalde jointypen, bijvoorbeeld in LEFT OUTER JOIN
, kunnen alleen scheeftrekken aan de linkerkant worden geoptimaliseerd.
Verouderd
De term Adaptieve uitvoering bestaat sinds Spark 1.6, maar de nieuwe AQE in Spark 3.0 is fundamenteel anders. Wat de functionaliteit betreft, voert Spark 1.6 alleen het gedeelte 'dynamisch samenvoegingspartities' uit. Wat de technische architectuur betreft, is de nieuwe AQE een framework van dynamische planning en herplanning van query's op basis van runtimestatistieken, die ondersteuning biedt voor diverse optimalisaties, zoals de optimalisaties die we in dit artikel hebben beschreven en kunnen worden uitgebreid om meer mogelijke optimalisaties mogelijk te maken.