Udostępnij za pośrednictwem


Adaptacyjne wykonywanie zapytań

Wykonywanie zapytań adaptacyjnych (AQE) to ponowna optymalizacja zapytań wykonywana podczas wykonywania zapytania.

Motywacją do ponownej optymalizacji środowiska uruchomieniowego jest to, że usługa Azure Databricks ma najwięcej up-to— dokładne statystyki na końcu wymiany mieszania i emisji (określane jako etap zapytania w AQE). W związku z tym usługa Azure Databricks może wybrać lepszą strategię fizyczną, ustalić optymalny rozmiar i liczbę po przetasowaniu partition lub przeprowadzić optymalizacje, które wcześniej wymagały wskazówek, na przykład obsługę skośności join.

Może to być bardzo przydatne, gdy zbieranie statystyk nie jest włączone lub gdy statystyki są nieaktualne. Jest to również przydatne w miejscach, where statycznie pochodne statystyki są niedokładne, takie jak w środku skomplikowanego zapytania lub po wystąpieniu niesymetryczności danych.

Możliwości

Usługa AQE jest domyślnie włączona. Ma 4 główne funkcje:

  • Dynamicznie zmienia scalanie sortujące join na rozgłoszony hasz join.
  • Dynamicznie scala partycje (łączenie małych partycji w większe) po wymianie danych. Bardzo małe zadania mają gorszą przepływność operacji we/wy i mają tendencję do większego obciążenia związanego z planowaniem i konfiguracją zadań. Łączenie małych zadań pozwala zaoszczędzić zasoby i zwiększyć przepływność klastra.
  • Dynamicznie obsługuje niesymetryczność w scalaniu sortowania join i mieszania skrótów join przez podzielenie (i replikowanie w razie potrzeby) niesymetrycznych zadań na mniej więcej zadania o równomiernym rozmiarze.
  • Dynamicznie wykrywa i propaguje puste relacje.

Aplikacja

Usługa AQE ma zastosowanie do wszystkich zapytań, które są następujące:

  • Brak przesyłania strumieniowego
  • Zawiera co najmniej jedną wymianę (zwykle w przypadku join, agregatu lub window), jedno podzapytanie lub oba te elementy.

Nie wszystkie zapytania stosowane w usłudze AQE muszą zostać ponownie zoptymalizowane. Ponowna optymalizacja może lub nie może wymyślić innego planu zapytania niż statycznie skompilowany. Aby ustalić, czy plan zapytania został zmieniony przez usługę AQE, zobacz następującą sekcję Plany zapytań.

Plany zapytań

W tej sekcji omówiono sposób analizowania planów zapytań na różne sposoby.

W tej sekcji:

Interfejs użytkownika platformy Spark

węzeł AdaptiveSparkPlan

Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zwykle jako węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Przed uruchomieniem zapytania lub jego uruchomieniu flaga isFinalPlan odpowiedniego węzła AdaptiveSparkPlan jest wyświetlana jako false; po zakończeniu wykonywania zapytania flaga isFinalPlan zmieni się na true.

Ewoluujący plan

Diagram planu zapytania ewoluuje wraz z postępem wykonywania i odzwierciedla najbardziej bieżący plan, który jest wykonywany. Węzły, które zostały już wykonane (w których metryki są dostępne), nie ulegną zmianie, ale te, które nie zostały jeszcze wykonane, mogą się zmieniać z czasem w wyniku ponownej optymalizacji.

Poniżej przedstawiono przykład diagramu planu zapytania:

diagram zapytań planu

DataFrame.explain()

węzeł AdaptiveSparkPlan

Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zwykle jako węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Przed uruchomieniem zapytania lub jego uruchomieniu flaga isFinalPlan odpowiedniego węzła AdaptiveSparkPlan jest wyświetlana jako false; po zakończeniu wykonywania zapytania flaga isFinalPlan zmieni się na true.

Bieżący i początkowy plan

W każdym węźle AdaptiveSparkPlan będzie istnieć zarówno plan początkowy (plan przed zastosowaniem optymalizacji AQE), jak i bieżący lub końcowy plan, w zależności od tego, czy wykonanie zostało ukończone. Bieżący plan będzie ewoluował w miarę postępu wykonywania.

Statystyki środowiska uruchomieniowego

Każdy etap mieszania i emisji zawiera statystyki danych.

Przed uruchomieniem etapu lub po uruchomieniu etapu statystyki są szacowane w czasie kompilacji, a flaga isRuntime jest false, na przykład: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Po zakończeniu wykonywania etapu, statystyki to te zebrane w czasie wykonywania, a flaga isRuntime zmieni się na true, na przykład: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true).

Poniżej przedstawiono przykład DataFrame.explain:

  • Przed wykonaniem

    przed wykonaniem

  • Podczas wykonywania

    Podczas wykonywania

  • Po wykonaniu

    po wykonaniu

SQL EXPLAIN

węzeł AdaptiveSparkPlan

Zapytania stosujące AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zazwyczaj jako węzeł główny każdego głównego zapytania lub podzapytania.

Brak bieżącego planu

Ponieważ SQL EXPLAIN nie wykonuje zapytania, bieżący plan jest zawsze taki sam jak plan początkowy i nie odzwierciedla tego, co ostatecznie get wykonywane przez AQE.

Poniżej przedstawiono przykład SQL explain:

SQL explain

Skuteczność

Plan zapytania zmieni się, jeśli co najmniej jedna optymalizacja AQE wejdzie w życie. Efekt tych optymalizacji AQE jest przedstawiony przez różnicę między bieżącymi i końcowymi planami oraz początkowym planem i określonymi węzłami planu w bieżących i końcowych planach.

  • Dynamicznie zmień scalanie sortowania join na hasz rozgłoszeniowy join: różne fizyczne węzły join między bieżącym/końcowym planem a planem początkowym.

    Join ciąg strategii

  • Dynamiczne łączenie partycji: węzeł CustomShuffleReader z właściwością Coalesced

    niestandardowy czytnik do mieszania

    niestandardowy ciąg czytnika losowego mieszania

  • Dynamiczne przetwarzanie odchylenia join: węzeł SortMergeJoin z polem isSkew jako prawdziwe.

    Skoś join planu

    Skew join ciągu

  • Dynamiczne wykrywanie i propagowanie pustych relacji: część (lub całość) planu jest zastępowana przez węzeł LocalTableScan z pustym polem relacji.

    lokalne table skanowanie

    ciąg skanowania lokalnego table

Konfiguracja

W tej sekcji:

Włączanie i wyłączanie wykonywania zapytań adaptacyjnych

Własność
spark.databricks.optimizer.adaptive.enabled

Typ: Boolean

Czy włączyć lub wyłączyć wykonywanie zapytań adaptacyjnych.

Wartość domyślna: true

Włącz automatycznie zoptymalizowane mieszanie

Własność
spark.sql.shuffle.partitions

Typ: Integer

Domyślna liczba partycji do użycia podczas mieszania danych dla sprzężeń lub agregacji. Ustawienie wartości auto umożliwia automatyczną optymalizację mieszania, która automatycznie określa tę liczbę na podstawie planu zapytania i rozmiaru danych wejściowych zapytania.

Uwaga: w przypadku przesyłania strumieniowego ze strukturą tej konfiguracji nie można zmienić między ponownymi uruchomieniami zapytania z tej samej lokalizacji punktu kontrolnego.

Wartość domyślna: 200

Dynamicznie zmieniaj scalanie sortowania join na join skrótów emisji

Własność
spark.databricks.adaptive.autoBroadcastJoinThreshold

Typ: Byte String

Próg wyzwalający przełączenie na emisję join podczas działania.

Wartość domyślna: 30MB

Dynamiczne łączenie partycji

Własność
spark.sql.adaptive.coalescePartitions.enabled

Typ: Boolean

Czy włączyć lub wyłączyć łączenie partition.

Wartość domyślna: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Typ: Byte String

Rozmiar docelowy po połączeniu. Połączone rozmiary partition będą bliskie, ale nie przekraczające tego rozmiaru docelowego.

Wartość domyślna: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Typ: Byte String

Minimalny rozmiar partycji po scaleniu. Połączone rozmiary partition nie będą mniejsze niż ten rozmiar.

Wartość domyślna: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Typ: Integer

Minimalna liczba partycji po scaleniu. Nie jest zalecane, ponieważ jawne ustawienie nadpisuje wcześniejsze ustawienia.
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Wartość domyślna: 2x nr. rdzeni klastra

Dynamiczna obsługa odchylenia join

Własność
spark.sql.adaptive.skewJoin.enabled

Typ: Boolean

Czy włączyć lub wyłączyć obsługę nachylenia join.

Wartość domyślna: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Typ: Integer

Czynnik, który po pomnożeniu przez medianę rozmiaru partition przyczynia się do określenia, czy partition jest niesymetryczny.

Wartość domyślna: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Typ: Byte String

Próg, który przyczynia się do określenia, czy partition jest niesymetryczny.

Wartość domyślna: 256MB

partition jest uważany za niesymetryczny, gdy (partition size > skewedPartitionFactor * median partition size) i (partition size > skewedPartitionThresholdInBytes)true.

Dynamiczne wykrywanie i propagowanie pustych relacji

Własność
spark.databricks.adaptive.emptyRelationPropagation.enabled

Typ: Boolean

Czy włączyć lub wyłączyć dynamiczne rozpowszechnianie pustych relacji.

Wartość domyślna: true

Często zadawane pytania

W tej sekcji:

Dlaczego AQE nie emitowała małego jointable?

Jeśli rozmiar relacji, która ma być nadana, jest poniżej tego progu, ale nadal nie jest nadawana:

  • Sprawdź typ join. Emisja nie jest obsługiwana w przypadku niektórych typów join, na przykład lewej relacji LEFT OUTER JOIN nie można rozgłaszać.
  • Może się również zdarzyć, że relacja zawiera wiele pustych partycji, w takim przypadku większość zadań może szybko zakończyć się sortowaniem przez scalanie join lub potencjalnie można ją zoptymalizować poprzez obsługę przechyłu join. AQE unika zmiany takich sortowanych sprzężeń scalających na rozgłaszanie połączeń z haszowaniem, jeśli wartość procentowa partycji niepustych jest niższa niż spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Czy nadal należy używać wskazówki strategii emisji join z włączoną funkcją AQE?

Tak. Statycznie planowana emisja join jest zwykle bardziej wydajna niż dynamicznie planowana przez AQE, ponieważ AQE może nie przełączyć się na emisję join, dopóki nie wykona mieszania dla obu stron join (w tym czasie rzeczywiste rozmiary relacji są uzyskiwane). Dlatego użycie wskazówki emisji może nadal być dobrym wyborem, jeśli dobrze znasz zapytanie. AQE będzie przestrzegać wskazówek dotyczących zapytań w taki sam sposób jak optymalizacja statyczna, ale nadal może stosować optymalizacje dynamiczne, na które wskazówki nie mają wpływu.

Jaka jest różnica pomiędzy przesunięciem join jako wskazówką a optymalizacją przesunięcia AQE join? Którego z nich należy użyć?

Zaleca się poleganie na obsługiwaniu przechylenia AQE join zamiast używać wskazówki dotyczącej przechylenia join, ponieważ przechylenie AQE join jest całkowicie automatyczne i zwykle działa lepiej niż wskazówka.

Dlaczego AQE nie dostosowało mojego zamówienia join automatycznie?

Dynamiczne porządkowanie join nie jest częścią AQE.

Dlaczego usługa AQE nie wykryła niesymetryczności danych?

Istnieją dwa warunki rozmiaru, które muszą zostać spełnione, aby AQE wykryło partition jako skośne partition:

  • Rozmiar partition jest większy niż spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (domyślnie 256 MB)
  • Rozmiar partition jest większy niż mediana rozmiarów wszystkich partycji pomnożona przez współczynnik niesymetryczności partitionspark.sql.adaptive.skewJoin.skewedPartitionFactor (wartość domyślna 5).

Ponadto obsługa przechyłu jest ograniczona dla niektórych typów join, na przykład w LEFT OUTER JOINmożna zoptymalizować przechył tylko po lewej stronie.

Spuścizna

Termin "Wykonywanie adaptacyjne" istnieje od wersji Spark 1.6, ale nowa AQE na platformie Spark 3.0 jest zasadniczo inna. Pod względem funkcjonalności, Spark 1.6 wykonuje tylko dynamiczne scalanie partycji. Jeśli chodzi o architekturę techniczną, nowa AQE to struktura dynamicznego planowania i ponownego planowania zapytań na podstawie statystyk środowiska uruchomieniowego, która obsługuje różne optymalizacje, takie jak te opisane w tym artykule i można rozszerzyć, aby umożliwić bardziej potencjalne optymalizacje.