Delen via


Uitvoering van adaptieve queries

Adaptieve queryuitvoering (AQE) is het opnieuw optimaliseren van query's die plaatsvinden tijdens het uitvoeren van query's.

De motivatie voor het opnieuw optimaliseren van runtimes is dat Azure Databricks de meest up-tonauwkeurige statistieken heeft aan het einde van een shuffle- en broadcast-uitwisseling (ook wel een queryfase in AQE genoemd). Als gevolg hiervan kan Azure Databricks kiezen voor een betere fysieke strategie, een optimale partitiegrootte en -aantal na de shuffle kiezen, of optimalisaties uitvoeren die voorheen hints vereisten, zoals bijvoorbeeld scheve join-afhandeling.

Dit kan zeer nuttig zijn wanneer het verzamelen van statistieken niet is ingeschakeld of wanneer statistieken verouderd zijn. Het is ook handig op plaatsen waar statisch afgeleide statistieken onnauwkeurig zijn, zoals in het midden van een gecompliceerde query of na het voorkomen van scheeftrekken van gegevens.

Mogelijkheden

AQE is standaard ingeschakeld. Het heeft vier belangrijke functies:

  • Wijzigt dynamisch de samenvoegbewerking in een broadcast-hash-join.
  • Partities worden dynamisch samengevoegd (kleine partities combineren tot een redelijk formaat) na een shuffle-uitwisseling. Zeer kleine taken hebben slechtere I/O-doorvoer en hebben meestal meer last van planningsoverhead en overhead voor het instellen van taken. Door kleine taken te combineren, bespaart men middelen en verbetert men de doorvoer van het cluster.
  • Verwerkt dynamisch scheefheid in sort merge-join en shuffle hash-join door scheefgetrokken taken te splitsen (en indien nodig te repliceren) in ongeveer even grote taken.
  • Automatisch worden lege relaties gedetecteerd en doorgegeven.

Toepassing

AQE is van toepassing op alle query's die:

  • Niet-streaming
  • Bevatten ten minste één uitwisseling (meestal wanneer er een join, aggregaat of venster is), één subquery of beide.

Niet alle AQE-toegepaste query's zijn noodzakelijkerwijs opnieuw geoptimaliseerd. De heroptimalisatie kan al dan niet een ander queryplan opleveren dan het plan dat statisch is gecompileerd. Als u wilt bepalen of het plan van een query is gewijzigd door AQE, raadpleegt u de volgende sectie, Query-plannen.

Queryplannen

In deze sectie wordt beschreven hoe u queryplannen op verschillende manieren kunt onderzoeken.

In deze sectie:

Spark-gebruikersinterface

AdaptiveSparkPlan knooppunt

AQE-toegepaste queries bevatten een of meerdere AdaptiveSparkPlan knooppunten, meestal als de hoofdnode van elke hoofdquery of subquery. Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan vlag van het bijbehorende AdaptiveSparkPlan knooppunt weergegeven als false; nadat de uitvoering van de query is voltooid, verandert de isFinalPlan vlag in true.

Evoluerend plan

Het queryplandiagram ontwikkelt zich naarmate de uitvoering vordert en weerspiegelt het meest recente plan dat wordt uitgevoerd. Knooppunten die al zijn uitgevoerd (waarin metrische gegevens beschikbaar zijn) zullen niet veranderen, maar knooppunten die nog niet zijn uitgevoerd, kunnen na verloop van tijd veranderen als gevolg van heroptimalisaties.

Hier volgt een voorbeeld van een queryplandiagram:

queryplan diagram

DataFrame.explain()

AdaptiveSparkPlan knooppunt

AQE-toegepaste queries bevatten een of meer AdaptiveSparkPlan-knooppunten, meestal als het hoofdknooppunt van elke hoofdquery of subquery. Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan vlag van het bijbehorende AdaptiveSparkPlan knooppunt weergegeven als false; Nadat de uitvoering van de query is voltooid, wordt de vlag isFinalPlan gewijzigd in true.

Huidig en eerste plan

Onder elk AdaptiveSparkPlan knooppunt zijn er zowel het eerste plan (het plan voordat AQE-optimalisaties worden toegepast) als het huidige of het uiteindelijke plan, afhankelijk van of de uitvoering is voltooid. Het huidige plan zal zich ontwikkelen naarmate de uitvoering vordert.

Runtimestatistieken

Elke shuffle- en broadcastfase bevat gegevensstatistieken.

Voordat de fase begint of wanneer de fase in uitvoering is, zijn de statistieken samensteltijdinschattingen, en is de vlag isRuntimefalse, bijvoorbeeld: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Nadat de uitvoering van de fase is voltooid, worden de statistieken verzameld tijdens runtime en wordt de vlag isRuntimetrue, bijvoorbeeld: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Hier volgt een DataFrame.explain voorbeeld:

  • Vóór de uitvoering

    Vóór uitvoering

  • Tijdens de uitvoering

    Tijdens de uitvoering

  • Na de uitvoering

    na uitvoering

SQL EXPLAIN

AdaptiveSparkPlan knooppunt

AQE-toegepaste queries bevatten een of meer AdaptiveSparkPlan-knooppunten, meestal als hoofdknooppunt van elke hoofdquery of subquery.

Geen huidig abonnement

Omdat SQL EXPLAIN de query niet uitvoert, is het huidige plan altijd hetzelfde als het oorspronkelijke plan en wordt niet weergegeven wat uiteindelijk door AQE wordt uitgevoerd.

Hier volgt een voorbeeld van een SQL-uitleg:

SQL uitleggen

Effectiviteit

Het queryplan wordt gewijzigd als een of meer AQE-optimalisaties van kracht worden. Het effect van deze AQE-optimalisaties wordt gedemonstreerd door het verschil tussen de huidige en definitieve plannen en het eerste plan en specifieke planknooppunten in de huidige en definitieve plannen.

  • De samenvoeg-sortering dynamisch wijzigen in een uitzend-hash-join: verschillende fysieke samenvoegknooppunten tussen het huidige of definitieve plan en het beginplan.

    tekenreeks voor joinstrategie

  • Partities dynamisch samenvoegen: knooppunt CustomShuffleReader met eigenschap Coalesced

    aangepaste shuffle-lezer

    aangepaste schudlezertekenreeks

  • Scheve join dynamisch verwerken: knooppunt SortMergeJoin met veld isSkew als waar.

    scheef verbindingsreeks

  • Lege relaties dynamisch detecteren en doorgeven: een deel van (of het gehele) plan wordt vervangen door de LocalTableScan-knoop, waarbij het relationele veld leeg is.

    lokale tabelscan

    lokale tabelscan-tekst

Configuratie

In deze sectie:

Adaptieve queryuitvoering in- en uitschakelen

Eigenschap
spark.databricks.optimizer.adaptive.enabled
Type: Boolean
Of u adaptieve query-uitvoering wilt inschakelen of uitschakelen.
Standaardwaarde: true

Automatisch geoptimaliseerde shuffle inschakelen

Eigenschap
spark.sql.shuffle.partitions
Type: Integer
Het standaardaantal partities dat moet worden gebruikt bij het opsnipperen van gegevens voor joins of aggregaties. Door de waarde auto in te stellen, wordt de automatisch geoptimaliseerde shuffle ingeschakeld. Dit getal wordt dan automatisch bepaald op basis van het queryplan en de grootte van de queryinvoergegevens.
Opmerking: Voor gestructureerd streamen kan deze configuratie niet worden gewijzigd tussen opnieuw opstarten van query's vanaf dezelfde controlepuntlocatie.
Standaardwaarde: 200

Dynamisch sorteer-samenvoeg-verbinding wijzigen in uitzenden-hash-verbinding

Eigendom
spark.databricks.adaptive.autoBroadcastJoinThreshold
Type: Byte String
De drempelwaarde voor het activeren van overschakelen naar broadcast-join tijdens runtime.
Standaardwaarde: 30MB

Dynamisch samenvoegen van partities

Onroerend goed
spark.sql.adaptive.coalescePartitions.enabled
Type: Boolean
Of het samenvoegen van partities moet worden in- of uitgeschakeld.
Standaardwaarde: true
spark.sql.adaptive.advisoryPartitionSizeInBytes
Type: Byte String
De doelgrootte na het samenvoegen. De grootten van de gesamenceerde partities liggen dicht bij, maar niet groter dan deze doelgrootte.
Standaardwaarde: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize
Type: Byte String
De minimale grootte van partities na het samenvoegen. De grootten van de samengesamende partities zijn niet kleiner dan deze grootte.
Standaardwaarde: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum
Type: Integer
Het minimale aantal partities na het samenvoegen. Niet aanbevolen, omdat het instellen expliciet overschrijft
spark.sql.adaptive.coalescePartitions.minPartitionSize.
Standaardwaarde: 2x aantal clustercores

Scheefheidsdeelname dynamisch verwerken

Eigendom
spark.sql.adaptive.skewJoin.enabled
Type: Boolean
Of u het verwerken van schuine joins wilt in- of uitschakelen.
Standaardwaarde: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Type: Integer
Een factor die bij vermenigvuldiging met de mediaanpartitiegrootte bijdraagt aan het bepalen of een partitie scheef is.
Standaardwaarde: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Type: Byte String
Een drempelwaarde die bijdraagt aan het bepalen of een partitie scheef is.
Standaardwaarde: 256MB

Een partitie wordt als scheef beschouwd wanneer zowel (partition size > skewedPartitionFactor * median partition size) als (partition size > skewedPartitionThresholdInBytes)truezijn.

Lege relaties dynamisch detecteren en doorgeven

Eigendom
spark.databricks.adaptive.emptyRelationPropagation.enabled
Type: Boolean
Hiermee wordt aangegeven of dynamische leeg-relatie propagatie moet worden in- of uitgeschakeld.
Standaardwaarde: true

Veelgestelde vragen (FAQ)

In deze sectie:

Waarom heeft AQE geen kleine join-tabel uitgezonden?

Als de grootte van de verwachte relatie onder deze drempelwaarde valt, maar nog steeds niet wordt uitgezonden:

  • Controleer het verbindingstype. Broadcast wordt niet ondersteund voor bepaalde jointypen, bijvoorbeeld de linkerrelatie van een LEFT OUTER JOIN kan niet worden uitgezonden.
  • Het kan ook zijn dat de relatie veel lege partities bevat. In dat geval kan het merendeel van de taken snel worden voltooid met sort merge join of kan deze worden geoptimaliseerd met de verwerking van scheve joins. AQE vermijdt het omzetten van sort merge joins naar broadcast-hash-joins als het percentage niet-lege partities lager is dan spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Kan ik nog steeds een hint voor een broadcast-joinstrategie gebruiken als AQE is ingeschakeld?

Ja. Een statisch geplande broadcast-join is meestal beter presterend dan een door AQE dynamisch geplande join, omdat AQE mogelijk pas overschakelt naar broadcast-join nadat voor beide zijden van de join een shuffle is uitgevoerd, tegen die tijd de werkelijke grootte van de relaties bekend zijn. Het gebruik van een broadcast-hint kan dus nog steeds een goede keuze zijn als u uw query goed kent. AQE respecteert queryhints op dezelfde manier als statische optimalisatie, maar kan dynamische optimalisaties toepassen die niet worden beïnvloed door de hints.

Wat is het verschil tussen skew join-hint en AQE scheve join- optimalisatie? Welke moet ik gebruiken?

Het wordt aanbevolen om te vertrouwen op AQE scheefheidsdeelnameverwerking in plaats van de scheefheidshint te gebruiken, omdat AQE-scheefheidsdeelname volledig automatisch is en in het algemeen beter presteert dan de hint-tegenhanger.

Waarom heeft AQE niet automatisch mijn join-volgorde aangepast?

Dynamische joins opnieuw ordenen maakt geen deel uit van AQE.

Waarom heeft AQE mijn gegevens scheeftrekken niet gedetecteerd?

Er zijn twee groottevoorwaarden waaraan moet worden voldaan voor AQE om een partitie te detecteren als een scheve partitie:

  • De partitiegrootte is groter dan de spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (standaard 256 MB)
  • De partitiegrootte is groter dan de mediaangrootte van alle partities en de scheve partitiefactor spark.sql.adaptive.skewJoin.skewedPartitionFactor (standaard 5)

Bovendien is de ondersteuning voor de optimalisatie van scheeftrekking beperkt voor bepaalde jointypen, bijvoorbeeld in LEFT OUTER JOINkan alleen scheeftrekking aan de linkerkant worden geoptimaliseerd.

Nalatenschap

De term Adaptieve uitvoering bestaat sinds Spark 1.6, maar de nieuwe AQE in Spark 3.0 is fundamenteel anders. Wat functionaliteit betreft, voert Spark 1.6 alleen de 'dynamisch samenvoegen van partities' uit. Wat de technische architectuur betreft, is de nieuwe AQE een framework van dynamische planning en herplanning van query's op basis van runtimestatistieken, die ondersteuning biedt voor diverse optimalisaties, zoals de optimalisaties die we in dit artikel hebben beschreven en kunnen worden uitgebreid om meer mogelijke optimalisaties mogelijk te maken.