Udostępnij za pośrednictwem


Adaptacyjne wykonywanie zapytań

Adaptacyjne wykonywanie zapytań (AQE) to ponowna optymalizacja zapytań wykonywana podczas wykonywania zapytania.

Motywacją do ponownej optymalizacji środowiska uruchomieniowego jest to, że usługa Azure Databricks ma najbardziej aktualne dokładne statystyki na końcu wymiany shuffle i emisji (określane jako etap zapytania w AQE). W związku z tym usługa Azure Databricks może zdecydować się na lepszą strategię fizyczną, wybrać optymalny rozmiar partycji i liczbę po przetasowaniach lub przeprowadzić optymalizacje, które były używane do wymagania wskazówek, na przykład obsługi niesymetryczności sprzężenia.

Może to być bardzo przydatne, gdy zbieranie statystyk nie jest włączone lub gdy statystyki są nieaktualne. Jest to również przydatne w miejscach, w których statycznie pochodne statystyki są niedokładne, takie jak w środku skomplikowanego zapytania lub po wystąpieniu niesymetryczności danych.

Funkcje

Usługa AQE jest domyślnie włączona. Ma 4 główne funkcje:

  • Dynamicznie zmienia sortowanie scalania sprzężenia w sprzężenia skrótu emisji.
  • Dynamicznie łączy partycje (połącz małe partycje w odpowiednio duże partycje) po zmianie rozmiaru. Bardzo małe zadania mają gorszą przepływność operacji we/wy i mają tendencję do większego obciążenia związanego z planowaniem i konfiguracją zadań. Łączenie małych zadań pozwala zaoszczędzić zasoby i zwiększyć przepływność klastra.
  • Dynamicznie obsługuje niesymetryczne sprzężenie scalania sortowania i sprzężenie skrótu shuffle przez podzielenie (i replikowanie w razie potrzeby) niesymetrycznych zadań w mniej więcej zadania o równomiernym rozmiarze.
  • Dynamicznie wykrywa i propaguje puste relacje.

Aplikacja

Usługa AQE ma zastosowanie do wszystkich zapytań, które są następujące:

  • Brak przesyłania strumieniowego
  • Zawiera co najmniej jedną wymianę (zwykle w przypadku sprzężenia, agregacji lub okna), jednego zapytania podrzędnego lub obu tych elementów.

Nie wszystkie zapytania stosowane w usłudze AQE muszą zostać ponownie zoptymalizowane. Ponowna optymalizacja może lub nie może wymyślić innego planu zapytania niż statycznie skompilowany. Aby określić, czy plan zapytania został zmieniony przez usługę AQE, zobacz poniższą sekcję Plany zapytań.

Plany zapytań

W tej sekcji omówiono sposób analizowania planów zapytań na różne sposoby.

W tej sekcji:

Interfejs użytkownika platformy Spark

Węzeł AdaptiveSparkPlan

Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden AdaptiveSparkPlan węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Przed uruchomieniem zapytania lub jego uruchomieniu isFinalPlan flaga odpowiedniego AdaptiveSparkPlan węzła będzie wyświetlana jako false; po zakończeniu wykonywania zapytania flaga isFinalPlan zmieni się na true.

Ewoluujący plan

Diagram planu zapytania ewoluuje wraz z postępem wykonywania i odzwierciedla najbardziej bieżący plan, który jest wykonywany. Węzły, które zostały już wykonane (w których metryki są dostępne), nie zostaną zmienione, ale te, które nie mogły się zmieniać w czasie w wyniku ponownej optymalizacji.

Poniżej przedstawiono przykład diagramu planu zapytania:

Query plan diagram

DataFrame.explain()

Węzeł AdaptiveSparkPlan

Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden AdaptiveSparkPlan węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Zanim zapytanie zostanie uruchomione lub gdy jest uruchomione, isFinalPlan flaga odpowiedniego AdaptiveSparkPlan węzła będzie wyświetlana jako false; po zakończeniu wykonywania zapytania flaga isFinalPlan zmieni się na true.

Bieżący i początkowy plan

W każdym AdaptiveSparkPlan węźle będzie dostępny zarówno początkowy plan (plan przed zastosowaniem optymalizacji AQE), jak i bieżący lub końcowy plan, w zależności od tego, czy wykonanie zostało ukończone. Bieżący plan będzie ewoluował w miarę postępu wykonywania.

Statystyki środowiska uruchomieniowego

Każdy etap mieszania i emisji zawiera statystyki danych.

Przed uruchomieniem etapu lub po uruchomieniu etapu statystyki są szacowane w czasie kompilacji, a flaga isRuntime to false, na przykład: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Po zakończeniu wykonywania etapu statystyki są zbierane w czasie wykonywania, a flaga isRuntime stanie się true, na przykład: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

DataFrame.explain Oto przykład:

  • Przed wykonaniem

    Before execution

  • Podczas wykonywania

    During execution

  • Po wykonaniu

    After execution

SQL EXPLAIN

Węzeł AdaptiveSparkPlan

Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zwykle jako węzeł główny każdego zapytania głównego lub podrzędnego zapytania.

Brak bieżącego planu

Ponieważ SQL EXPLAIN nie wykonuje zapytania, bieżący plan jest zawsze taki sam jak plan początkowy i nie odzwierciedla tego, co ostatecznie zostanie wykonane przez usługę AQE.

Poniżej przedstawiono przykład objaśnienia sql:

SQL explain

Skuteczności

Plan zapytania zmieni się, jeśli co najmniej jedna optymalizacja AQE zacznie obowiązywać. Efekt tych optymalizacji AQE jest przedstawiony przez różnicę między bieżącymi i końcowymi planami oraz początkowym planem i określonymi węzłami planu w bieżących i końcowych planach.

  • Dynamicznie zmieniaj sprzężenia scalania sortowania w sprzężenia skrótów emisji: różne fizyczne węzły sprzężenia między bieżącym/końcowym planem a planem początkowym

    Join strategy string

  • Dynamiczne łączenie partycji: węzeł CustomShuffleReader z właściwością Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Dynamiczne obsługa sprzężenia niesymetrycznego: węzeł SortMergeJoin z polem isSkew ma wartość true.

    Skew join plan

    Skew join string

  • Dynamiczne wykrywanie i propagowanie pustych relacji: część (lub cała) planu jest zastępowana przez węzeł LocalTableScan z polem relacji jako puste.

    Local table scan

    Local table scan string

Konfigurowanie

W tej sekcji:

Włączanie i wyłączanie wykonywania zapytań adaptacyjnych

Właściwości
spark.databricks.optimizer.adaptive.enabled

Typ: Boolean

Czy włączyć lub wyłączyć wykonywanie zapytań adaptacyjnych.

Wartość domyślna: true

Włączanie automatycznego mieszania zoptymalizowanego

Właściwości
spark.sql.shuffle.partitions

Typ: Integer

Domyślna liczba partycji do użycia podczas mieszania danych dla sprzężeń lub agregacji. Ustawienie wartości auto umożliwia automatyczną optymalizację mieszania, która automatycznie określa tę liczbę na podstawie planu zapytania i rozmiaru danych wejściowych zapytania.

Uwaga: w przypadku przesyłania strumieniowego ze strukturą tej konfiguracji nie można zmienić między ponownymi uruchomieniami zapytania z tej samej lokalizacji punktu kontrolnego.

Wartość domyślna: 200

Dynamicznie zmieniaj sprzężenia scalania sortowania w sprzężenia skrótu emisji

Właściwości
spark.databricks.adaptive.autoBroadcastJoinThreshold

Typ: Byte String

Próg wyzwalający przełączanie na sprzężenie rozgłaszane w czasie wykonywania.

Wartość domyślna: 30MB

Dynamiczne łączenie partycji

Właściwości
spark.sql.adaptive.coalescePartitions.enabled

Typ: Boolean

Czy włączyć lub wyłączyć łączenie partycji.

Wartość domyślna: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Typ: Byte String

Rozmiar docelowy po łączenia. Rozmiary partycji z połączeniem będą zbliżone do rozmiaru, ale nie większe niż ten rozmiar docelowy.

Wartość domyślna: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Typ: Byte String

Minimalny rozmiar partycji po łączenia. Rozmiary partycji z połączeniem nie będą mniejsze niż ten rozmiar.

Wartość domyślna: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Typ: Integer

Minimalna liczba partycji po łączeniach. Niezalecane, ponieważ jawne zastępowanie ustawień
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Wartość domyślna: 2x nr. rdzeni klastra

Dynamiczne obsługa sprzężenia niesymetrycznego

Właściwości
spark.sql.adaptive.skewJoin.enabled

Typ: Boolean

Czy włączyć lub wyłączyć obsługę sprzężenia niesymetrycznego.

Wartość domyślna: true
spark.sql.adaptive.skewJoin.skew.skewedPartitionFactor

Typ: Integer

Czynnik, który po pomnożony przez rozmiar partycji mediany przyczynia się do określenia, czy partycja jest niesymetryczna.

Wartość domyślna: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Typ: Byte String

Próg, który przyczynia się do określenia, czy partycja jest niesymetryczna.

Wartość domyślna: 256MB

Partycja jest uważana za niesymetryczną, gdy zarówno , jak (partition size > skewedPartitionFactor * median partition size) i (partition size > skewedPartitionThresholdInBytes)true.

Dynamiczne wykrywanie i propagowanie pustych relacji

Właściwości
spark.databricks.adaptive.emptyRelationPropagation.enabled

Typ: Boolean

Czy włączyć lub wyłączyć propagację dynamicznej pustej relacji.

Wartość domyślna: true

Często zadawane pytania

W tej sekcji:

Dlaczego AQE nie emitowała małej tabeli sprzężenia?

Jeśli rozmiar relacji oczekiwanej do emisji spadnie poniżej tego progu, ale nadal nie jest emitowany:

  • Sprawdź typ sprzężenia. Emisja nie jest obsługiwana w przypadku niektórych typów sprzężeń, na przykład nie można rozgłasić lewej relacji obiektu LEFT OUTER JOIN .
  • Może to być również, że relacja zawiera wiele pustych partycji, w takim przypadku większość zadań może zakończyć się szybko za pomocą sprzężenia scalania sortowania lub może być potencjalnie zoptymalizowana z obsługą sprzężenia niesymetrycznego. AQE unika zmiany takich sprzężeń scalania sortowania w celu rozgłaszania sprzężeń skrótów, jeśli procent partycji niepustych jest niższy niż spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Czy nadal należy używać wskazówki strategii dołączania emisji z włączoną funkcją AQE?

Tak. Statycznie planowane sprzężenie emisji jest zwykle bardziej wydajne niż dynamicznie planowane przez AQE, ponieważ AQE może nie przełączyć się na sprzężenie emisji, dopóki nie zostanie wykonane przetasowanie dla obu stron sprzężenia (w tym czasie rzeczywiste rozmiary relacji są uzyskiwane). Dlatego użycie wskazówki emisji może nadal być dobrym wyborem, jeśli dobrze znasz zapytanie. Funkcja AQE będzie przestrzegać wskazówek dotyczących zapytań w taki sam sposób, jak w przypadku optymalizacji statycznej, ale nadal może stosować optymalizacje dynamiczne, które nie mają wpływu na wskazówki.

Jaka jest różnica między niesymetryczną wskazówką sprzężenia a optymalizacją sprzężenia niesymetrycznego AQE? Którego z nich należy użyć?

Zaleca się poleganie na obsłudze sprzężenia niesymetrycznego AQE, a nie używania wskazówek sprzężenia niesymetrycznego, ponieważ sprzężenie AQE jest całkowicie automatyczne i ogólnie działa lepiej niż odpowiednik wskazówek.

Dlaczego usługa AQE nie dostosowyła kolejności sprzężenia automatycznie?

Zmiana kolejności sprzężeń dynamicznych nie jest częścią AQE.

Dlaczego usługa AQE nie wykryła niesymetryczności danych?

Istnieją dwa warunki rozmiaru, które muszą zostać spełnione, aby usługa AQE wykryła partycję jako niesymetryczną partycję:

  • Rozmiar partycji jest większy niż spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (domyślnie 256 MB)
  • Rozmiar partycji jest większy niż mediana rozmiaru wszystkich partycji w czasie niesymetryczności współczynnika spark.sql.adaptive.skewJoin.skewedPartitionFactor partycji (wartość domyślna 5)

Ponadto obsługa niesymetryczności jest ograniczona dla niektórych typów sprzężenia, na przykład w LEFT OUTER JOINsystemie można zoptymalizować tylko niesymetryczność po lewej stronie.

Starsza wersja

Termin "Wykonywanie adaptacyjne" istnieje od wersji Spark 1.6, ale nowa AQE na platformie Spark 3.0 jest zasadniczo inna. Jeśli chodzi o funkcjonalność, platforma Spark 1.6 wykonuje tylko część "dynamicznie łączą partycje". Jeśli chodzi o architekturę techniczną, nowa AQE to struktura dynamicznego planowania i ponownego planowania zapytań na podstawie statystyk środowiska uruchomieniowego, która obsługuje różne optymalizacje, takie jak te opisane w tym artykule i można rozszerzyć, aby umożliwić bardziej potencjalne optymalizacje.