Adaptacyjne wykonywanie zapytań
Adaptacyjne wykonywanie zapytań (AQE) to ponowna optymalizacja zapytań wykonywana podczas wykonywania zapytania.
Motywacją do ponownej optymalizacji środowiska uruchomieniowego jest to, że usługa Azure Databricks ma najbardziej aktualne dokładne statystyki na końcu wymiany shuffle i emisji (określane jako etap zapytania w AQE). W związku z tym usługa Azure Databricks może zdecydować się na lepszą strategię fizyczną, wybrać optymalny rozmiar partycji i liczbę po przetasowaniach lub przeprowadzić optymalizacje, które były używane do wymagania wskazówek, na przykład obsługi niesymetryczności sprzężenia.
Może to być bardzo przydatne, gdy zbieranie statystyk nie jest włączone lub gdy statystyki są nieaktualne. Jest to również przydatne w miejscach, w których statycznie pochodne statystyki są niedokładne, takie jak w środku skomplikowanego zapytania lub po wystąpieniu niesymetryczności danych.
Funkcje
Usługa AQE jest domyślnie włączona. Ma 4 główne funkcje:
- Dynamicznie zmienia sortowanie scalania sprzężenia w sprzężenia skrótu emisji.
- Dynamicznie łączy partycje (połącz małe partycje w odpowiednio duże partycje) po zmianie rozmiaru. Bardzo małe zadania mają gorszą przepływność operacji we/wy i mają tendencję do większego obciążenia związanego z planowaniem i konfiguracją zadań. Łączenie małych zadań pozwala zaoszczędzić zasoby i zwiększyć przepływność klastra.
- Dynamicznie obsługuje niesymetryczne sprzężenie scalania sortowania i sprzężenie skrótu shuffle przez podzielenie (i replikowanie w razie potrzeby) niesymetrycznych zadań w mniej więcej zadania o równomiernym rozmiarze.
- Dynamicznie wykrywa i propaguje puste relacje.
Aplikacja
Usługa AQE ma zastosowanie do wszystkich zapytań, które są następujące:
- Brak przesyłania strumieniowego
- Zawiera co najmniej jedną wymianę (zwykle w przypadku sprzężenia, agregacji lub okna), jednego zapytania podrzędnego lub obu tych elementów.
Nie wszystkie zapytania stosowane w usłudze AQE muszą zostać ponownie zoptymalizowane. Ponowna optymalizacja może lub nie może wymyślić innego planu zapytania niż statycznie skompilowany. Aby określić, czy plan zapytania został zmieniony przez usługę AQE, zobacz poniższą sekcję Plany zapytań.
Plany zapytań
W tej sekcji omówiono sposób analizowania planów zapytań na różne sposoby.
W tej sekcji:
Interfejs użytkownika platformy Spark
Węzeł AdaptiveSparkPlan
Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden AdaptiveSparkPlan
węzeł główny każdego zapytania głównego lub zapytania podrzędnego.
Przed uruchomieniem zapytania lub jego uruchomieniu isFinalPlan
flaga odpowiedniego AdaptiveSparkPlan
węzła będzie wyświetlana jako false
; po zakończeniu wykonywania zapytania flaga isFinalPlan
zmieni się na true.
Ewoluujący plan
Diagram planu zapytania ewoluuje wraz z postępem wykonywania i odzwierciedla najbardziej bieżący plan, który jest wykonywany. Węzły, które zostały już wykonane (w których metryki są dostępne), nie zostaną zmienione, ale te, które nie mogły się zmieniać w czasie w wyniku ponownej optymalizacji.
Poniżej przedstawiono przykład diagramu planu zapytania:
DataFrame.explain()
Węzeł AdaptiveSparkPlan
Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden AdaptiveSparkPlan
węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Zanim zapytanie zostanie uruchomione lub gdy jest uruchomione, isFinalPlan
flaga odpowiedniego AdaptiveSparkPlan
węzła będzie wyświetlana jako false
; po zakończeniu wykonywania zapytania flaga isFinalPlan
zmieni się na true
.
Bieżący i początkowy plan
W każdym AdaptiveSparkPlan
węźle będzie dostępny zarówno początkowy plan (plan przed zastosowaniem optymalizacji AQE), jak i bieżący lub końcowy plan, w zależności od tego, czy wykonanie zostało ukończone. Bieżący plan będzie ewoluował w miarę postępu wykonywania.
Statystyki środowiska uruchomieniowego
Każdy etap mieszania i emisji zawiera statystyki danych.
Przed uruchomieniem etapu lub po uruchomieniu etapu statystyki są szacowane w czasie kompilacji, a flaga isRuntime
to false
, na przykład: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Po zakończeniu wykonywania etapu statystyki są zbierane w czasie wykonywania, a flaga isRuntime
stanie się true
, na przykład: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
DataFrame.explain
Oto przykład:
Przed wykonaniem
Podczas wykonywania
Po wykonaniu
SQL EXPLAIN
Węzeł AdaptiveSparkPlan
Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zwykle jako węzeł główny każdego zapytania głównego lub podrzędnego zapytania.
Brak bieżącego planu
Ponieważ SQL EXPLAIN
nie wykonuje zapytania, bieżący plan jest zawsze taki sam jak plan początkowy i nie odzwierciedla tego, co ostatecznie zostanie wykonane przez usługę AQE.
Poniżej przedstawiono przykład objaśnienia sql:
Skuteczności
Plan zapytania zmieni się, jeśli co najmniej jedna optymalizacja AQE zacznie obowiązywać. Efekt tych optymalizacji AQE jest przedstawiony przez różnicę między bieżącymi i końcowymi planami oraz początkowym planem i określonymi węzłami planu w bieżących i końcowych planach.
Dynamicznie zmieniaj sprzężenia scalania sortowania w sprzężenia skrótów emisji: różne fizyczne węzły sprzężenia między bieżącym/końcowym planem a planem początkowym
Dynamiczne łączenie partycji: węzeł
CustomShuffleReader
z właściwościąCoalesced
Dynamiczne obsługa sprzężenia niesymetrycznego: węzeł
SortMergeJoin
z polemisSkew
ma wartość true.Dynamiczne wykrywanie i propagowanie pustych relacji: część (lub cała) planu jest zastępowana przez węzeł LocalTableScan z polem relacji jako puste.
Konfigurowanie
W tej sekcji:
- Włączanie i wyłączanie wykonywania zapytań adaptacyjnych
- Włączanie automatycznego mieszania zoptymalizowanego
- Dynamicznie zmieniaj sprzężenia scalania sortowania w sprzężenia skrótu emisji
- Dynamiczne łączenie partycji
- Dynamiczne obsługa sprzężenia niesymetrycznego
- Dynamiczne wykrywanie i propagowanie pustych relacji
Włączanie i wyłączanie wykonywania zapytań adaptacyjnych
Właściwości |
---|
spark.databricks.optimizer.adaptive.enabled Typ: Boolean Czy włączyć lub wyłączyć wykonywanie zapytań adaptacyjnych. Wartość domyślna: true |
Włączanie automatycznego mieszania zoptymalizowanego
Właściwości |
---|
spark.sql.shuffle.partitions Typ: Integer Domyślna liczba partycji do użycia podczas mieszania danych dla sprzężeń lub agregacji. Ustawienie wartości auto umożliwia automatyczną optymalizację mieszania, która automatycznie określa tę liczbę na podstawie planu zapytania i rozmiaru danych wejściowych zapytania.Uwaga: w przypadku przesyłania strumieniowego ze strukturą tej konfiguracji nie można zmienić między ponownymi uruchomieniami zapytania z tej samej lokalizacji punktu kontrolnego. Wartość domyślna: 200 |
Dynamicznie zmieniaj sprzężenia scalania sortowania w sprzężenia skrótu emisji
Właściwości |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Typ: Byte String Próg wyzwalający przełączanie na sprzężenie rozgłaszane w czasie wykonywania. Wartość domyślna: 30MB |
Dynamiczne łączenie partycji
Właściwości |
---|
spark.sql.adaptive.coalescePartitions.enabled Typ: Boolean Czy włączyć lub wyłączyć łączenie partycji. Wartość domyślna: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Typ: Byte String Rozmiar docelowy po łączenia. Rozmiary partycji z połączeniem będą zbliżone do rozmiaru, ale nie większe niż ten rozmiar docelowy. Wartość domyślna: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Typ: Byte String Minimalny rozmiar partycji po łączenia. Rozmiary partycji z połączeniem nie będą mniejsze niż ten rozmiar. Wartość domyślna: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Typ: Integer Minimalna liczba partycji po łączeniach. Niezalecane, ponieważ jawne zastępowanie ustawień spark.sql.adaptive.coalescePartitions.minPartitionSize .Wartość domyślna: 2x nr. rdzeni klastra |
Dynamiczne obsługa sprzężenia niesymetrycznego
Właściwości |
---|
spark.sql.adaptive.skewJoin.enabled Typ: Boolean Czy włączyć lub wyłączyć obsługę sprzężenia niesymetrycznego. Wartość domyślna: true |
spark.sql.adaptive.skewJoin.skew.skewedPartitionFactor Typ: Integer Czynnik, który po pomnożony przez rozmiar partycji mediany przyczynia się do określenia, czy partycja jest niesymetryczna. Wartość domyślna: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Typ: Byte String Próg, który przyczynia się do określenia, czy partycja jest niesymetryczna. Wartość domyślna: 256MB |
Partycja jest uważana za niesymetryczną, gdy zarówno , jak (partition size > skewedPartitionFactor * median partition size)
i (partition size > skewedPartitionThresholdInBytes)
są true
.
Dynamiczne wykrywanie i propagowanie pustych relacji
Właściwości |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Typ: Boolean Czy włączyć lub wyłączyć propagację dynamicznej pustej relacji. Wartość domyślna: true |
Często zadawane pytania
W tej sekcji:
- Dlaczego AQE nie emitowała małej tabeli sprzężenia?
- Czy nadal należy używać wskazówki strategii dołączania emisji z włączoną funkcją AQE?
- Jaka jest różnica między niesymetryczną wskazówką sprzężenia a optymalizacją sprzężenia niesymetrycznego AQE? Którego z nich należy użyć?
- Dlaczego usługa AQE nie dostosowyła kolejności sprzężenia automatycznie?
- Dlaczego usługa AQE nie wykryła niesymetryczności danych?
Dlaczego AQE nie emitowała małej tabeli sprzężenia?
Jeśli rozmiar relacji oczekiwanej do emisji spadnie poniżej tego progu, ale nadal nie jest emitowany:
- Sprawdź typ sprzężenia. Emisja nie jest obsługiwana w przypadku niektórych typów sprzężeń, na przykład nie można rozgłasić lewej relacji obiektu
LEFT OUTER JOIN
. - Może to być również, że relacja zawiera wiele pustych partycji, w takim przypadku większość zadań może zakończyć się szybko za pomocą sprzężenia scalania sortowania lub może być potencjalnie zoptymalizowana z obsługą sprzężenia niesymetrycznego. AQE unika zmiany takich sprzężeń scalania sortowania w celu rozgłaszania sprzężeń skrótów, jeśli procent partycji niepustych jest niższy niż
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Czy nadal należy używać wskazówki strategii dołączania emisji z włączoną funkcją AQE?
Tak. Statycznie planowane sprzężenie emisji jest zwykle bardziej wydajne niż dynamicznie planowane przez AQE, ponieważ AQE może nie przełączyć się na sprzężenie emisji, dopóki nie zostanie wykonane przetasowanie dla obu stron sprzężenia (w tym czasie rzeczywiste rozmiary relacji są uzyskiwane). Dlatego użycie wskazówki emisji może nadal być dobrym wyborem, jeśli dobrze znasz zapytanie. Funkcja AQE będzie przestrzegać wskazówek dotyczących zapytań w taki sam sposób, jak w przypadku optymalizacji statycznej, ale nadal może stosować optymalizacje dynamiczne, które nie mają wpływu na wskazówki.
Jaka jest różnica między niesymetryczną wskazówką sprzężenia a optymalizacją sprzężenia niesymetrycznego AQE? Którego z nich należy użyć?
Zaleca się poleganie na obsłudze sprzężenia niesymetrycznego AQE, a nie używania wskazówek sprzężenia niesymetrycznego, ponieważ sprzężenie AQE jest całkowicie automatyczne i ogólnie działa lepiej niż odpowiednik wskazówek.
Dlaczego usługa AQE nie dostosowyła kolejności sprzężenia automatycznie?
Zmiana kolejności sprzężeń dynamicznych nie jest częścią AQE.
Dlaczego usługa AQE nie wykryła niesymetryczności danych?
Istnieją dwa warunki rozmiaru, które muszą zostać spełnione, aby usługa AQE wykryła partycję jako niesymetryczną partycję:
- Rozmiar partycji jest większy niż
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(domyślnie 256 MB) - Rozmiar partycji jest większy niż mediana rozmiaru wszystkich partycji w czasie niesymetryczności współczynnika
spark.sql.adaptive.skewJoin.skewedPartitionFactor
partycji (wartość domyślna 5)
Ponadto obsługa niesymetryczności jest ograniczona dla niektórych typów sprzężenia, na przykład w LEFT OUTER JOIN
systemie można zoptymalizować tylko niesymetryczność po lewej stronie.
Starsza wersja
Termin "Wykonywanie adaptacyjne" istnieje od wersji Spark 1.6, ale nowa AQE na platformie Spark 3.0 jest zasadniczo inna. Jeśli chodzi o funkcjonalność, platforma Spark 1.6 wykonuje tylko część "dynamicznie łączą partycje". Jeśli chodzi o architekturę techniczną, nowa AQE to struktura dynamicznego planowania i ponownego planowania zapytań na podstawie statystyk środowiska uruchomieniowego, która obsługuje różne optymalizacje, takie jak te opisane w tym artykule i można rozszerzyć, aby umożliwić bardziej potencjalne optymalizacje.