Execução de consulta adaptável
A AQE (execução de consulta adaptável) é a reotimização da consulta que ocorre durante a execução da consulta.
A motivação para a reotimização do tempo de execução é que o Azure Databricks tem as estatísticas precisas mais atualizadas no final de um intercâmbio de ordem aleatória e de difusão (chamado de estágio de consulta na AQE). Como resultado, o Azure Databricks pode optar por uma estratégia física melhor, escolher o tamanho e o número da partição de ordem aleatória ideal, ou as otimizações usadas para exigir dicas, por exemplo, manipulação de junção de inclinação.
Isso poderá ser muito útil quando a coleta de estatísticas não estiver ativada ou quando as estatísticas estiverem obsoletas. Também será útil em locais onde estatísticas derivadas estaticamente são imprecisas, como no meio de uma consulta complicada ou após a ocorrência de distorção de dados.
Funcionalidades
O AQE está habilitado por padrão. Ela tem quatro recursos principais:
- Altera dinamicamente a junção de mesclagem de classificação na junção hash de difusão.
- Une partições dinamicamente (combine partições pequenas em partições de tamanho razoável) após a troca em ordem aleatória. As tarefas muito pequenas têm pior taxa de transferência de E/S e tendem a sofrer mais do que o agendamento da sobrecarga e da sobrecarga da configuração da tarefa. A combinação de tarefas pequenas salva recursos e melhora a taxa de transferência do cluster.
- Manipula dinamicamente a distorção em classificar junção de mesclagem e junção hash aleatória dividindo (e replicando se necessário) tarefas distorcidas em tarefas de tamanho quase uniforme.
- Detecta e propaga dinamicamente as relações vazias.
Aplicativo
A AQE aplica-se a todas as consultas:
- Não streaming
- Que contêm pelo menos uma troca (geralmente, quando há uma junção, agregação ou janela), uma subconsulta ou ambas.
Nem todas as consultas aplicadas à AQE são necessariamente reotimizadas. A nova otimização pode ou não surgir com um plano de consulta diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado por AQE, confira a seção a seguir, Planos de consulta.
Planos de consultas
Esta seção discute como você pode examinar os planos de consulta de maneiras diferentes.
Nesta seção:
Interface do usuário do Spark
Nó AdaptiveSparkPlan
As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan
, geralmente como o nó raiz de cada consulta principal ou subconsulta.
Antes que a consulta seja executada ou quando estiver em execução, o sinalizador isFinalPlan
do nó AdaptiveSparkPlan
correspondente será mostrado como false
; depois que a execução da consulta for concluída, o sinalizador isFinalPlan
será alterado para true.
Plano em evolução
O diagrama do plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais há métricas disponíveis) não serão alterados, mas aqueles que não podem mudar ao longo do tempo como resultado de reotimizações.
Este é um exemplo de diagrama de plano de consulta:
DataFrame.explain()
Nó AdaptiveSparkPlan
As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan
, geralmente como o nó raiz de cada consulta principal ou subconsulta. Antes que a consulta seja executada ou quando estiver em execução, o sinalizador isFinalPlan
do nó AdaptiveSparkPlan
correspondente será mostrado como false
; depois que a execução da consulta for concluída, o sinalizador isFinalPlan
será alterado para true
.
Plano atual e inicial
Em cada nó AdaptiveSparkPlan
, haverá o plano inicial (o plano antes de aplicar qualquer otimização da AQE) e o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução for progredida.
Estatísticas de tempo de execução
Cada estágio de difusão e ordem aleatória contém estatísticas de dados.
Antes que o estágio seja executado ou quando o estágio estiver em execução, as estatísticas serão estimativas de tempo de compilação e o sinalizador isRuntime
será false
, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Depois que a execução do estágio for concluída, as estatísticas serão aquelas coletadas em tempo de execução e o sinalizador isRuntime
se tornará true
, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
A seguir, é mostrado um exemplo de DataFrame.explain
:
Antes da execução
Durante a execução
Após a execução
SQL EXPLAIN
Nó AdaptiveSparkPlan
As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada consulta principal ou subconsulta.
Sem plano atual
Como SQL EXPLAIN
não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que eventualmente seria executado pela AQE.
A seguir, um exemplo explicado por SQL:
Eficácia
O plano de consulta será alterado se uma ou mais otimizações da AQE entrarem em vigor. O efeito dessas otimizações da AQE é demonstrado pela diferença entre os planos atual e final e os nós de plano inicial e plano específico nos planos atual e final.
Alterar dinamicamente a junção de mesclagem de classificação em junção hash de difusão: nós de junção físicas diferentes entre o plano atual/final e o plano inicial
Unindo partições dinamicamente: nó
CustomShuffleReader
com a propriedadeCoalesced
Manipule dinamicamente a junção de inclinação: nó
SortMergeJoin
com campoisSkew
como verdadeiro.Detectar e propagar dinamicamente relações vazias: parte de (ou todo) o plano é substituído pelo nó LocalTableScan com o campo de relação como vazio.
Configuração
Nesta seção:
- Habilitar e desabilitar a execução de consulta adaptável
- Habilitar o embaralhamento otimizado automaticamente
- Altera dinamicamente a junção de mesclagem de classificação na junção hash de difusão
- Unir partições dinamicamente
- Manipular dinamicamente a junção de distorção
- Detectar e propagar dinamicamente relações vazias
Habilitar e desabilitar a execução de consulta adaptável
Propriedade |
---|
spark.databricks.optimizer.adaptive.enabled Digite: Boolean Se a execução da consulta adaptável deve ser habilitada ou desabilitada. Valor padrão: true |
Habilitar o embaralhamento otimizado automaticamente
Propriedade |
---|
spark.sql.shuffle.partitions Digite: Integer O número padrão de partições a serem usadas ao embaralhar dados para junções ou agregações. A configuração do valor auto habilita a ordem aleatória com otimização automática, que determina automaticamente esse número com base no plano de consulta e no tamanho dos dados de entrada da consulta.Observação: para o Fluxo Estruturado, essa configuração não pode ser alterada entre as reinicializações de consulta do mesmo local de ponto de verificação. Valor padrão: 200 |
Alterar dinamicamente a junção de mesclagem de classificação em junção hash de difusão
Propriedade |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Digite: Byte String O limite para disparar a alternância para a junção de difusão no tempo de execução. Valor padrão: 30MB |
Unir partições dinamicamente
Propriedade |
---|
spark.sql.adaptive.coalescePartitions.enabled Digite: Boolean Se a união de partição deve ser habilitada ou desabilitada. Valor padrão: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Digite: Byte String O tamanho do destino após a união. Os tamanhos de partição unidas serão próximos, mas não maiores que esse tamanho de destino. Valor padrão: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Digite: Byte String O tamanho mínimo de partições após a união. Os tamanhos de partição unidas não serão menores do que esse tamanho. Valor padrão: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Digite: Integer O tamanho mínimo de partições após a união. Não recomendado, porque a configuração substitui explicitamente spark.sql.adaptive.coalescePartitions.minPartitionSize .Valor padrão: 2x o número de núcleos do cluster |
Manipular dinamicamente a junção de distorção
Propriedade |
---|
spark.sql.adaptive.skewJoin.enabled Digite: Boolean Se a manipulação de junção de distorção deve ser habilitada ou desabilitada. Valor padrão: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Digite: Integer Um fator que, quando multiplicado pelo tamanho da partição mediana, contribui para determinar se uma partição está distorcida. Valor padrão: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Digite: Byte String Um limite que contribui para determinar se uma partição é distorcida. Valor padrão: 256MB |
Uma partição é considerada distorcida quando (partition size > skewedPartitionFactor * median partition size)
e (partition size > skewedPartitionThresholdInBytes)
são true
.
Detectar e propagar relações vazias dinamicamente
Propriedade |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Digite: Boolean Se a propagação de relação vazia dinâmica deve ser habilitada ou desabilitada. Valor padrão: true |
Perguntas frequentes (FAQ)
Nesta seção:
- Por que a AQE não transmitiu uma pequena tabela de junção?
- Ainda devo usar uma dica de estratégia de junção de difusão com a AQE habilitada?
- Qual é a diferença entre a dica de junção de distorção e a otimização de junção de distorção do AQE? Qual deles devo usar?
- Por que a AQE não ajusta minha ordenação de junção automaticamente?
- Por que a AQE não detectou minha distorção de dados?
Por que a AQE não transmitiu uma pequena tabela de junção?
Se o tamanho da relação esperada para difusão estiver abaixo desse limite, mas ainda não for transmitido:
- Verifique o tipo de junção. Não há suporte para a transmissão para determinados tipos de junção, por exemplo, a relação à esquerda de um
LEFT OUTER JOIN
não pode ser transmitida. - Também pode ser que a relação contenha muitas partições vazias; nesse caso, a maioria das tarefas pode ser concluída rapidamente com a junção de mesclagem de classificação ou potencialmente pode ser otimizada com manipulação de junção de distorção. A AQE evitará alterar essas junções de mesclagem de classificação para junções de hash de difusão se o percentual de partições não vazias for menor que
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Ainda devo usar uma dica de estratégia de junção de difusão com a AQE habilitada?
Sim. Uma junção de difusão planejada estaticamente geralmente é mais bem-desempenho do que uma planejada dinamicamente pelo AQE, pois o AQE pode não mudar para a junção de difusão até depois de executar o embaralhamento para ambos os lados da junção (quando os tamanhos reais da relação são obtidos). Portanto, usar uma dica de difusão ainda poderá ser uma boa opção se você conhecer bem a consulta. A AQE respeitará as dicas de consulta da mesma maneira que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.
Qual é a diferença entre a dica de junção de distorção e a otimização de junção de distorção do AQE? Qual deles devo usar?
É recomendável contar com o tratamento de junção de distorção da AQE em vez de usar a dica de junção de distorção, pois a junção de distorção da AQE é completamente automática e, em geral, tem um desempenho melhor do que a contraparte de dica.
Por que a AQE não ajusta minha ordenação de junção automaticamente?
A reordenação dinâmica de junções não faz parte do AQE.
Por que a AQE não detectou minha distorção de dados?
Há duas condições de tamanho que devem ser atendidas para que a AQE detecte uma partição como uma partição distorcida:
- O tamanho da partição é superior a
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(o padrão é 256 MB) - O tamanho da partição é maior que o tamanho mediano de todas as partições vezes o fator de partição distorcida
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(o padrão é 5)
Além disso, o suporte ao tratamento de distorção é limitado para determinados tipos de junção, por exemplo, no LEFT OUTER JOIN
, somente distorção no lado esquerdo pode ser otimizada.
Herdada
O termo "Execução Adaptável" existe desde o Spark 1.6, mas a nova AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte "união dinâmica de partições". Em termos de arquitetura técnica, a nova AQE é uma estrutura de planejamento dinâmico e replanejamento de consultas com base em estatísticas de runtime, que dá suporte a uma variedade de otimizações, como aquelas descritas neste artigo e pode ser estendida para habilitar otimizações mais potenciais.