Ejecución de consultas adaptables
La ejecución de consultas adaptables (AQE) es la re-optimización de consultas que se produce durante la ejecución de la consulta.
La motivación para la re-optimización en tiempo de ejecución es que Azure Databricks tiene las estadísticas más precisas actualizadas al final de un intercambio aleatorio y de difusión (denominado fase de consulta en AQE). Como resultado, Azure Databricks puede optar por una mejor estrategia física, elegir un número y un tamaño de partición óptimos posteriores al orden aleatorio, o realizar optimizaciones para las que antes era necesario sugerencias, por ejemplo, para el control de la combinación de sesgo.
Esto puede ser muy útil cuando la recopilación de estadísticas no está activada o cuando las estadísticas están obsoletas. También es útil en lugares donde las estadísticas derivadas de forma estática son inexactas, como en medio de una consulta complicada, o después de la aparición de asimetría de datos.
Funcionalidades
AQE está habilitado de forma predeterminada. Tiene cuatro características principales:
- Cambia dinámicamente la fusión mediante combinación de ordenación a combinación de hash de difusión.
- Fusiona las particiones de manera dinámica (las particiones pequeñas en otras de tamaño razonable) después del intercambio aleatorio. Las tareas muy pequeñas tienen un rendimiento de E/S peor y tienden a sufrir más por la sobrecarga de programación y de configuración de tareas. La combinación de tareas pequeñas ahorra recursos y mejora el rendimiento del clúster.
- Controla dinámicamente el sesgo en la fusión mediante combinación de ordenación y la combinación de hash aleatoria mediante la división (y replicación si es necesario) de las tareas sesgadas en tareas de tamaño uniforme aproximadamente.
- Detecta y propaga dinámicamente relaciones vacías.
Application
AQE se aplica a todas las consultas que:
- No son de streaming
- Contienen al menos un intercambio (normalmente cuando hay una combinación, un agregado o una ventana), una subconsulta o las dos cosas.
No todas las consultas con AQE aplicado se vuelven a optimizar necesariamente. Es posible que la re-optimización genere un plan de consulta diferente al compilado de manera estática. Para determinar si AQE ha cambiado el plan de una consulta, vea la sección siguiente, Planes de consulta.
Planes de consulta
En esta sección se describe cómo puede examinar los planes de consulta de distintas maneras.
En esta sección:
Interfaz de usuario de Spark
Nodo AdaptiveSparkPlan
Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan
, normalmente como el nodo raíz de cada consulta principal o subconsulta.
Antes de que se ejecute la consulta o cuando esté en ejecución, la marca isFinalPlan
del nodo AdaptiveSparkPlan
correspondiente se muestra como false
; una vez que se completa la ejecución de la consulta, la marca isFinalPlan
cambia a true.
.
Plan en evolución
El diagrama del plan de consulta evoluciona a medida que avanza la ejecución y refleja el plan más actual que se ejecuta. Los nodos que ya se han ejecutado (en los que hay métricas disponibles) no cambiarán, pero los que no lo han hecho pueden cambiar en el tiempo como resultado de las optimizaciones.
A continuación se muestra un ejemplo de diagrama de un plan de consulta:
DataFrame.explain()
Nodo AdaptiveSparkPlan
Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan
, normalmente como el nodo raíz de cada consulta principal o subconsulta. Antes de que se ejecute la consulta o cuando esté en ejecución, la marca isFinalPlan
del nodo AdaptiveSparkPlan
correspondiente se muestra como false
; una vez que se completa la ejecución de la consulta, la marca isFinalPlan
cambia a true
.
Plan actual e inicial
En cada nodo AdaptiveSparkPlan
se mostrará tanto el plan inicial (el plan antes de aplicar las optimizaciones de AQE) como el plan actual o final, en función de si la ejecución se ha completado. El plan actual evolucionará a medida que avance la ejecución.
Estadísticas en tiempo de ejecución
Cada fase de orden aleatorio y difusión contiene estadísticas de datos.
Antes de que se ejecute la fase o cuando está en ejecución, las estadísticas son estimaciones en tiempo de compilación y la marca isRuntime
es false
, por ejemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
.
Una vez que se completa la ejecución de la fase, las estadísticas son las recopiladas en tiempo de ejecución y la marca isRuntime
se convertirá en true
, por ejemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
.
Este es un ejemplo de DataFrame.explain
:
Antes de la ejecución
Durante la ejecución
Después de la ejecución
SQL EXPLAIN
Nodo AdaptiveSparkPlan
Las consultas con AQE aplicado contienen uno o varios nodos AdaptiveSparkPlan, normalmente como el nodo raíz de cada consulta principal o subconsulta.
No hay ningún plan actual
Como SQL EXPLAIN
no ejecuta la consulta, el plan actual siempre es el mismo que el plan inicial y no refleja lo que AQE ejecutaría finalmente.
A continuación se muestra un ejemplo de explain en SQL:
Eficacia
El plan de consulta cambiará si una o varias optimizaciones de AQE tienen efecto. El efecto de estas optimizaciones de AQE se muestra en la diferencia entre los planes actual y final, y el plan inicial y los nodos de plan específicos de los planes actuales y finales.
Cambio dinámico de la fusión mediante combinación de ordenación en combinación de hash de difusión: distintos nodos de combinación física entre el plan actual o final, y el plan inicial
Fusión dinámica de particiones: nodo
CustomShuffleReader
con la propiedadCoalesced
Control dinámico de la combinación de sesgo: nodo
SortMergeJoin
con el campoisSkew
como true.Detección y propagación dinámica de relaciones vacías: el nodo LocalTableScan reemplaza parte del plan (o todo) con el campo de relación como vacío.
Configuración
En esta sección:
- Habilitación y deshabilitación de la ejecución de consultas adaptables
- Habilitación del orden aleatorio de optimización automática
- Cambio dinámico de la fusión mediante combinación de ordenación a combinación de hash de difusión
- Fusión dinámica de particiones
- Control dinámico de la combinación de sesgo
- Detección y propagación dinámica de relaciones vacías
Habilitación y deshabilitación de la ejecución de consultas adaptables
Propiedad |
---|
spark.databricks.optimizer.adaptive.enabled Escriba: Boolean Indica si se habilita o deshabilita la ejecución de consultas adaptables. Valor predeterminado: true |
Habilitación del orden aleatorio de optimización automática
Propiedad |
---|
spark.sql.shuffle.partitions Escriba: Integer Número predeterminado de particiones que se van a usar al ordenar aleatoriamente datos para combinaciones o agregaciones. Al establecer el valor auto , se habilitará el orden aleatorio de optimización automática, que determina automáticamente este número en función del plan de consulta y del tamaño de los datos de entrada de la consulta.Nota: para flujos estructurados, esta configuración no se puede cambiar entre reinicios de consulta desde la misma ubicación del punto de control. Valor predeterminado: 200 |
Cambio dinámico de la fusión mediante combinación de ordenación a combinación de hash de difusión
Propiedad |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Escriba: Byte String Umbral para desencadenar el cambio a la combinación de difusión en tiempo de ejecución. Valor predeterminado: 30MB |
Fusión dinámica de particiones
Propiedad |
---|
spark.sql.adaptive.coalescePartitions.enabled Escriba: Boolean Si se habilita o deshabilita la fusión de particiones. Valor predeterminado: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Escriba: Byte String Tamaño de destino después de la fusión. Los tamaños de partición fusionados tendrá un valor cercano, pero no más grandes que este tamaño de destino. Valor predeterminado: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Escriba: Byte String Tamaño mínimo de las particiones después de la fusión. Los tamaños de partición fusionados no serán menores que este tamaño. Valor predeterminado: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Escriba: Integer Número mínimo de particiones después de la fusión. No se recomienda, porque el valor lo invalida explícitamente spark.sql.adaptive.coalescePartitions.minPartitionSize .Valor predeterminado: el doble del número de núcleos de clúster |
Control dinámico de la combinación de sesgo
Propiedad |
---|
spark.sql.adaptive.skewJoin.enabled Escriba: Boolean Si se habilita o deshabilita el control de la combinación de sesgo. Valor predeterminado: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Escriba: Integer Factor que, cuando se multiplica por el tamaño medio de la partición, contribuye a determinar si una partición está sesgada. Valor predeterminado: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Escriba: Byte String Umbral que contribuye a determinar si una partición está sesgada. Valor predeterminado: 256MB |
Una partición se considera sesgada cuando (partition size > skewedPartitionFactor * median partition size)
y (partition size > skewedPartitionThresholdInBytes)
son true
.
Detección y propagación dinámica de relaciones vacías
Propiedad |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Escriba: Boolean Si se habilita o deshabilita la propagación dinámica de relaciones vacías. Valor predeterminado: true |
Preguntas más frecuentes
En esta sección:
- ¿Por qué AQE no ha difundido una tabla de combinación pequeña?
- ¿Se debe seguir usando una sugerencia de estrategia de combinación de difusión con AQE habilitado?
- ¿Cuál es la diferencia entre la sugerencia de combinación de sesgo y la optimización de combinación de sesgo de AQE? ¿Cuál debo usar?
- ¿Por qué AQE no ha ajustado el orden de combinación automáticamente?
- ¿Por qué AQE no ha detectado la asimetría de datos?
¿Por qué AQE no ha difundido una tabla de combinación pequeña?
Si el tamaño de la relación que se espera que se difunda se encuentra por debajo de este umbral, pero todavía no se difunde:
- Compruebe el tipo de combinación. La difusión no se admite para determinados tipos de combinación, por ejemplo, la relación izquierda de una operación
LEFT OUTER JOIN
no se puede difundir. - También es posible que la relación contenga una gran cantidad de particiones vacías, en cuyo caso la mayoría de las tareas pueden finalizar rápidamente con la fusión mediante combinación de ordenación o se puede optimizar con el control de la combinación sesgada. AQE evita cambiar estas fusiones mediante combinación de ordenación por combinaciones de hash de difusión si el porcentaje de particiones no vacías es inferior a
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
¿Se debe seguir usando una sugerencia de estrategia de combinación de difusión con AQE habilitado?
Sí. Una combinación de difusión planeada estáticamente suele ser más efectiva que una planeada de forma dinámica por parte de AQE, ya que es posible que AQE no cambie a la combinación de difusión hasta después de realizar el orden aleatorio para ambos lados de la combinación (momento en el que se obtienen los tamaños de relación reales). Por tanto, el uso de una sugerencia de difusión puede seguir siendo una buena opción si conoce bien la consulta. AQE respetará las sugerencias de consulta igual que lo hace la optimización estática, pero todavía puede aplicar optimizaciones dinámicas que no se ven afectadas por las sugerencias.
¿Cuál es la diferencia entre la sugerencia de combinación de sesgo y la optimización de combinación de sesgo de AQE? ¿Cuál debo usar?
Se recomienda confiar en el control de la combinación de sesgo de AQE en lugar de usar la sugerencia de combinación de sesgo, ya que la combinación de sesgo de AQE es completamente automática y, en general, funciona mejor que la de la sugerencia.
¿Por qué AQE no ha ajustado el orden de combinación automáticamente?
La reordenación de combinaciones dinámicas no forma parte de AQE.
¿Por qué AQE no ha detectado la asimetría de datos?
Hay dos condiciones de tamaño que se deben cumplir para que AQE detecte una partición como una partición sesgada:
- El tamaño de la partición es mayor que
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(el valor predeterminado es de 256 MB). - El tamaño de la partición es mayor que el tamaño medio de todas las particiones multiplicado por el factor de partición sesgada
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(el valor predeterminado es 5)
Además, la compatibilidad con el control del sesgo está limitada para determinados tipos de combinación, por ejemplo, en LEFT OUTER JOIN
solo se puede optimizar el sesgo del lado izquierdo.
Heredado
El término "Ejecución adaptable" existe desde Spark 1.6, pero el nuevo AQE en Spark 3.0 es fundamentalmente diferente. En términos de funcionalidad, Spark 1.6 solo realiza la parte de "fusión dinámica de particiones". En términos de arquitectura técnica, el nuevo AQE es un marco de planificación y replanificación dinámicas de consultas basado en estadísticas en tiempo de ejecución, que admite una variedad de optimizaciones como las que se han descrito en este artículo y que se puede ampliar para habilitar más optimizaciones.