Adaptacyjne wykonywanie zapytań
Wykonywanie zapytań adaptacyjnych (AQE) to ponowna optymalizacja zapytań wykonywana podczas wykonywania zapytania.
Motywacją do ponownej optymalizacji środowiska uruchomieniowego jest to, że usługa Azure Databricks ma najwięcej up-to— dokładne statystyki na końcu wymiany mieszania i emisji (określane jako etap zapytania w AQE). W związku z tym usługa Azure Databricks może wybrać lepszą strategię fizyczną, ustalić optymalny rozmiar i liczbę po przetasowaniu partition lub przeprowadzić optymalizacje, które wcześniej wymagały wskazówek, na przykład obsługę skośności join.
Może to być bardzo przydatne, gdy zbieranie statystyk nie jest włączone lub gdy statystyki są nieaktualne. Jest to również przydatne w miejscach, where statycznie pochodne statystyki są niedokładne, takie jak w środku skomplikowanego zapytania lub po wystąpieniu niesymetryczności danych.
Możliwości
Usługa AQE jest domyślnie włączona. Ma 4 główne funkcje:
- Dynamicznie zmienia scalanie sortujące join na rozgłoszony hasz join.
- Dynamicznie scala partycje (łączenie małych partycji w większe) po wymianie danych. Bardzo małe zadania mają gorszą przepływność operacji we/wy i mają tendencję do większego obciążenia związanego z planowaniem i konfiguracją zadań. Łączenie małych zadań pozwala zaoszczędzić zasoby i zwiększyć przepływność klastra.
- Dynamicznie obsługuje niesymetryczność w scalaniu sortowania join i mieszania skrótów join przez podzielenie (i replikowanie w razie potrzeby) niesymetrycznych zadań na mniej więcej zadania o równomiernym rozmiarze.
- Dynamicznie wykrywa i propaguje puste relacje.
Aplikacja
Usługa AQE ma zastosowanie do wszystkich zapytań, które są następujące:
- Brak przesyłania strumieniowego
- Zawiera co najmniej jedną wymianę (zwykle w przypadku join, agregatu lub window), jedno podzapytanie lub oba te elementy.
Nie wszystkie zapytania stosowane w usłudze AQE muszą zostać ponownie zoptymalizowane. Ponowna optymalizacja może lub nie może wymyślić innego planu zapytania niż statycznie skompilowany. Aby ustalić, czy plan zapytania został zmieniony przez usługę AQE, zobacz następującą sekcję Plany zapytań.
Plany zapytań
W tej sekcji omówiono sposób analizowania planów zapytań na różne sposoby.
W tej sekcji:
Interfejs użytkownika platformy Spark
węzeł AdaptiveSparkPlan
Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan
, zwykle jako węzeł główny każdego zapytania głównego lub zapytania podrzędnego.
Przed uruchomieniem zapytania lub jego uruchomieniu flaga isFinalPlan
odpowiedniego węzła AdaptiveSparkPlan
jest wyświetlana jako false
; po zakończeniu wykonywania zapytania flaga isFinalPlan
zmieni się na true.
Ewoluujący plan
Diagram planu zapytania ewoluuje wraz z postępem wykonywania i odzwierciedla najbardziej bieżący plan, który jest wykonywany. Węzły, które zostały już wykonane (w których metryki są dostępne), nie ulegną zmianie, ale te, które nie zostały jeszcze wykonane, mogą się zmieniać z czasem w wyniku ponownej optymalizacji.
Poniżej przedstawiono przykład diagramu planu zapytania:
diagram zapytań planu
DataFrame.explain()
węzeł AdaptiveSparkPlan
Zapytania zastosowane w usłudze AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan
, zwykle jako węzeł główny każdego zapytania głównego lub zapytania podrzędnego. Przed uruchomieniem zapytania lub jego uruchomieniu flaga isFinalPlan
odpowiedniego węzła AdaptiveSparkPlan
jest wyświetlana jako false
; po zakończeniu wykonywania zapytania flaga isFinalPlan
zmieni się na true
.
Bieżący i początkowy plan
W każdym węźle AdaptiveSparkPlan
będzie istnieć zarówno plan początkowy (plan przed zastosowaniem optymalizacji AQE), jak i bieżący lub końcowy plan, w zależności od tego, czy wykonanie zostało ukończone. Bieżący plan będzie ewoluował w miarę postępu wykonywania.
Statystyki środowiska uruchomieniowego
Każdy etap mieszania i emisji zawiera statystyki danych.
Przed uruchomieniem etapu lub po uruchomieniu etapu statystyki są szacowane w czasie kompilacji, a flaga isRuntime
jest false
, na przykład: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Po zakończeniu wykonywania etapu, statystyki to te zebrane w czasie wykonywania, a flaga isRuntime
zmieni się na true
, na przykład: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
.
Poniżej przedstawiono przykład DataFrame.explain
:
Przed wykonaniem
Podczas wykonywania
Po wykonaniu
SQL EXPLAIN
węzeł AdaptiveSparkPlan
Zapytania stosujące AQE zawierają co najmniej jeden węzeł AdaptiveSparkPlan, zazwyczaj jako węzeł główny każdego głównego zapytania lub podzapytania.
Brak bieżącego planu
Ponieważ SQL EXPLAIN
nie wykonuje zapytania, bieżący plan jest zawsze taki sam jak plan początkowy i nie odzwierciedla tego, co ostatecznie get wykonywane przez AQE.
Poniżej przedstawiono przykład SQL explain:
Skuteczność
Plan zapytania zmieni się, jeśli co najmniej jedna optymalizacja AQE wejdzie w życie. Efekt tych optymalizacji AQE jest przedstawiony przez różnicę między bieżącymi i końcowymi planami oraz początkowym planem i określonymi węzłami planu w bieżących i końcowych planach.
Dynamicznie zmień scalanie sortowania join na hasz rozgłoszeniowy join: różne fizyczne węzły join między bieżącym/końcowym planem a planem początkowym.
Dynamiczne łączenie partycji: węzeł
CustomShuffleReader
z właściwościąCoalesced
Dynamiczne przetwarzanie odchylenia join: węzeł
SortMergeJoin
z polemisSkew
jako prawdziwe.Dynamiczne wykrywanie i propagowanie pustych relacji: część (lub całość) planu jest zastępowana przez węzeł LocalTableScan z pustym polem relacji.
Konfiguracja
W tej sekcji:
- włączanie i wyłączanie wykonywania zapytań adaptacyjnych
- Włącz zoptymalizowane automatyczne mieszanie
- dynamicznie zmienia scalanie posortowanych danych join na rozgłaszany skrót join
- dynamicznie łączą partycje
- Dynamiczna obsługa skośności join
- Dynamicznie wykrywaj i propaguj puste relacje
Włączanie i wyłączanie wykonywania zapytań adaptacyjnych
Własność |
---|
spark.databricks.optimizer.adaptive.enabled Typ: Boolean Czy włączyć lub wyłączyć wykonywanie zapytań adaptacyjnych. Wartość domyślna: true |
Włącz automatycznie zoptymalizowane mieszanie
Własność |
---|
spark.sql.shuffle.partitions Typ: Integer Domyślna liczba partycji do użycia podczas mieszania danych dla sprzężeń lub agregacji. Ustawienie wartości auto umożliwia automatyczną optymalizację mieszania, która automatycznie określa tę liczbę na podstawie planu zapytania i rozmiaru danych wejściowych zapytania.Uwaga: w przypadku przesyłania strumieniowego ze strukturą tej konfiguracji nie można zmienić między ponownymi uruchomieniami zapytania z tej samej lokalizacji punktu kontrolnego. Wartość domyślna: 200 |
Dynamicznie zmieniaj scalanie sortowania join na join skrótów emisji
Własność |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Typ: Byte String Próg wyzwalający przełączenie na emisję join podczas działania. Wartość domyślna: 30MB |
Dynamiczne łączenie partycji
Własność |
---|
spark.sql.adaptive.coalescePartitions.enabled Typ: Boolean Czy włączyć lub wyłączyć łączenie partition. Wartość domyślna: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Typ: Byte String Rozmiar docelowy po połączeniu. Połączone rozmiary partition będą bliskie, ale nie przekraczające tego rozmiaru docelowego. Wartość domyślna: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Typ: Byte String Minimalny rozmiar partycji po scaleniu. Połączone rozmiary partition nie będą mniejsze niż ten rozmiar. Wartość domyślna: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Typ: Integer Minimalna liczba partycji po scaleniu. Nie jest zalecane, ponieważ jawne ustawienie nadpisuje wcześniejsze ustawienia. spark.sql.adaptive.coalescePartitions.minPartitionSize .Wartość domyślna: 2x nr. rdzeni klastra |
Dynamiczna obsługa odchylenia join
Własność |
---|
spark.sql.adaptive.skewJoin.enabled Typ: Boolean Czy włączyć lub wyłączyć obsługę nachylenia join. Wartość domyślna: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Typ: Integer Czynnik, który po pomnożeniu przez medianę rozmiaru partition przyczynia się do określenia, czy partition jest niesymetryczny. Wartość domyślna: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Typ: Byte String Próg, który przyczynia się do określenia, czy partition jest niesymetryczny. Wartość domyślna: 256MB |
partition jest uważany za niesymetryczny, gdy (partition size > skewedPartitionFactor * median partition size)
i (partition size > skewedPartitionThresholdInBytes)
są true
.
Dynamiczne wykrywanie i propagowanie pustych relacji
Własność |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Typ: Boolean Czy włączyć lub wyłączyć dynamiczne rozpowszechnianie pustych relacji. Wartość domyślna: true |
Często zadawane pytania
W tej sekcji:
- Dlaczego AQE nie emitowała małego jointable?
- czy nadal należy używać wskazówki strategii join emisji z włączoną funkcją AQE?
- Jaka jest różnica między wskazówką skew join a optymalizacją AQE skew join? Którego z nich należy użyć?
- Dlaczego AQE nie dostosowało automatycznie mojego zamawiania join?
- Dlaczego usługa AQE nie wykryła niesymetryczności danych?
Dlaczego AQE nie emitowała małego jointable?
Jeśli rozmiar relacji, która ma być nadana, jest poniżej tego progu, ale nadal nie jest nadawana:
- Sprawdź typ join. Emisja nie jest obsługiwana w przypadku niektórych typów join, na przykład lewej relacji
LEFT OUTER JOIN
nie można rozgłaszać. - Może się również zdarzyć, że relacja zawiera wiele pustych partycji, w takim przypadku większość zadań może szybko zakończyć się sortowaniem przez scalanie join lub potencjalnie można ją zoptymalizować poprzez obsługę przechyłu join. AQE unika zmiany takich sortowanych sprzężeń scalających na rozgłaszanie połączeń z haszowaniem, jeśli wartość procentowa partycji niepustych jest niższa niż
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Czy nadal należy używać wskazówki strategii emisji join z włączoną funkcją AQE?
Tak. Statycznie planowana emisja join jest zwykle bardziej wydajna niż dynamicznie planowana przez AQE, ponieważ AQE może nie przełączyć się na emisję join, dopóki nie wykona mieszania dla obu stron join (w tym czasie rzeczywiste rozmiary relacji są uzyskiwane). Dlatego użycie wskazówki emisji może nadal być dobrym wyborem, jeśli dobrze znasz zapytanie. AQE będzie przestrzegać wskazówek dotyczących zapytań w taki sam sposób jak optymalizacja statyczna, ale nadal może stosować optymalizacje dynamiczne, na które wskazówki nie mają wpływu.
Jaka jest różnica pomiędzy przesunięciem join jako wskazówką a optymalizacją przesunięcia AQE join? Którego z nich należy użyć?
Zaleca się poleganie na obsługiwaniu przechylenia AQE join zamiast używać wskazówki dotyczącej przechylenia join, ponieważ przechylenie AQE join jest całkowicie automatyczne i zwykle działa lepiej niż wskazówka.
Dlaczego AQE nie dostosowało mojego zamówienia join automatycznie?
Dynamiczne porządkowanie join nie jest częścią AQE.
Dlaczego usługa AQE nie wykryła niesymetryczności danych?
Istnieją dwa warunki rozmiaru, które muszą zostać spełnione, aby AQE wykryło partition jako skośne partition:
- Rozmiar partition jest większy niż
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(domyślnie 256 MB) - Rozmiar partition jest większy niż mediana rozmiarów wszystkich partycji pomnożona przez współczynnik niesymetryczności partition
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(wartość domyślna 5).
Ponadto obsługa przechyłu jest ograniczona dla niektórych typów join, na przykład w LEFT OUTER JOIN
można zoptymalizować przechył tylko po lewej stronie.
Spuścizna
Termin "Wykonywanie adaptacyjne" istnieje od wersji Spark 1.6, ale nowa AQE na platformie Spark 3.0 jest zasadniczo inna. Pod względem funkcjonalności, Spark 1.6 wykonuje tylko dynamiczne scalanie partycji. Jeśli chodzi o architekturę techniczną, nowa AQE to struktura dynamicznego planowania i ponownego planowania zapytań na podstawie statystyk środowiska uruchomieniowego, która obsługuje różne optymalizacje, takie jak te opisane w tym artykule i można rozszerzyć, aby umożliwić bardziej potencjalne optymalizacje.