Partager via


Gestion des grosses requêtes dans des workflows interactifs

Un défi avec les flux de travail de données interactifs consiste à gérer des requêtes volumineuses. Cela comprend les requêtes qui génèrent un trop grand nombre de lignes de sortie, récupèrent de nombreuses partitions externes ou effectuent des calculs sur des jeux de données extrêmement volumineux. Ces requêtes peuvent être extrêmement lentes, saturer les ressources de calcul et compliquer le partage du même calcul par d’autres personnes.

La surveillance des requêtes est un processus qui empêche les requêtes de monopoliser les ressources de calcul en examinant les causes les plus courantes des requêtes volumineuses et en terminant les requêtes qui dépassent un seuil. Cet article explique comment activer et configurer la surveillance des requêtes.

Important

La surveillance des requêtes est activée pour tous les calculs à usage général créés à l’aide de l’interface utilisateur.

Exemple de requête perturbatrice

Un analyste effectue des requêtes ad hoc dans un entrepôt de données juste-à-temps. L’analyste utilise un calcul à mise à l’échelle automatique partagé qui permet à plusieurs utilisateurs d’utiliser un seul calcul à la fois. Supposons que deux tables possèdent chacune un million de lignes.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Ces tailles de table peuvent être gérées dans Apache Spark. Toutefois, elles incluent chacune une colonne join_key avec une chaîne vide dans chaque ligne. Cela peut se produire si les données ne sont pas parfaitement propres ou s’il y a un décalage de données significatif où certaines clés sont plus répandues que d’autres. Ces clés de jointure vides sont beaucoup plus répandues que toute autre valeur.

Dans le code suivant, l’analyste joint ces deux tables sur leurs clés, ce qui produit une sortie de 1 000 000 000 000 résultats, et tous ces deux produits sur un seul exécuteur (l’exécuteur qui obtient la clé " ") :

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Cette requête semble être en cours d’exécution. Mais sans connaître les données, l’analyste constate qu’il n’y a plus qu’une seule tâche sur l’exécution du travail. La requête ne se termine jamais, laissant l’analyste frustré et déconcerter la raison pour laquelle il ne fonctionnait pas.

Dans ce cas, il n’y a qu’une seule clé de jointure problématique. D’autres fois, il peut y en avoir beaucoup plus.

Activer et configurer la surveillance des requêtes

Pour activer et configurer Query Watchdog, les étapes suivantes sont requises.

  • Activez Watchdog avec spark.databricks.queryWatchdog.enabled.
  • Configurez le temps d’exécution de la tâche avec spark.databricks.queryWatchdog.minTimeSecs.
  • Affichez les résultats avec spark.databricks.queryWatchdog.minOutputRows.
  • Configurez le ratio de sortie avec spark.databricks.queryWatchdog.outputRatioThreshold.

Pour empêcher une requête de créer un trop grand nombre de lignes de sortie pour le nombre de lignes d’entrée, vous pouvez activer la surveillance des requêtes et configurer le nombre maximal de lignes de sortie comme un multiple du nombre de lignes d’entrée. Dans cet exemple, nous utilisons un ratio de 1000 (valeur par défaut).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

La dernière configuration déclare qu’une tâche donnée ne doit jamais produire plus de 1000 fois le nombre de lignes d’entrée.

Conseil

Le ratio de sortie est entièrement personnalisable. Nous vous recommandons de commencer à partir du bas et de voir quel seuil fonctionne bien pour vous et votre équipe. Une plage de 1 000 à 10 000 est un bon point de départ.

Non seulement la surveillance des requêtes empêche les utilisateurs de monopoliser les ressources de calcul pour les travaux qui ne se terminent pas, mais elle fait également gagner du temps en faisant échouer rapidement une requête qui n’aurait jamais été terminée. Par exemple, la requête suivante échouera au bout de quelques minutes, car elle dépasse le ratio.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Voici ce que vous pouvez voir :

Requête watchdog

Il est généralement suffisant d’activer la surveillance des requêtes et de définir le ratio seuil de sortie/entrée, mais vous avez également la possibilité de définir deux propriétés supplémentaires : spark.databricks.queryWatchdog.minTimeSecs et spark.databricks.queryWatchdog.minOutputRows . Ces propriétés spécifient la durée minimale pendant laquelle une tâche donnée dans une requête doit être exécutée avant d’être annulée et le nombre minimal de lignes de sortie pour une tâche dans cette requête.

Par exemple, vous pouvez définir minTimeSecs sur une valeur supérieure si vous souhaitez lui donner la possibilité de produire un grand nombre de lignes par tâche. De même, vous pouvez définir spark.databricks.queryWatchdog.minOutputRows sur 10 millions si vous souhaitez arrêter une requête uniquement après qu’une tâche de cette requête a généré 10 millions lignes. Tout ce qui est moins et la requête est réussie, même si le ratio de sortie/entrée a été dépassé.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Conseil

Si vous configurez la surveillance des requêtes dans un notebook, la configuration n’est pas conservée entre les redémarrages du calcul. Si vous souhaitez configurer la surveillance des requêtes pour tous les utilisateurs d’un calcul, nous vous recommandons d’utiliser une configuration de calcul.

Détecter une requête sur un jeu de données très volumineux

Une autre requête classique peut analyser une grande quantité de données à partir de grands tableaux/jeux de données. L’opération d’analyse peut durer longtemps et saturer les ressources de calcul (même la lecture des métadonnées d’une grande table Hive peut prendre beaucoup de temps). Vous pouvez définir maxHivePartitions pour empêcher l’extraction d’un trop grand nombre de partitions d’une table de grande ruche. De même, vous pouvez également définir maxQueryTasks pour limiter les requêtes sur un jeu de données extrêmement volumineux.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Quand devez-vous activer la surveillance des requêtes ?

La surveillance des requêtes doit être activée pour le calcul d’analyse ad hoc où les scientifiques des données et analystes SQL partagent un calcul donné, et un administrateur doit s’assurer que les requêtes « fonctionnent correctement » les unes avec les autres.

Quand devez-vous activer la surveillance des requêtes ?

En général, nous ne conseillons pas d’annuler de manière dynamique les requêtes utilisées dans un scénario ETL, car il n’y a généralement pas de personne dans la boucle pour corriger l’erreur. Nous vous recommandons de désactiver la surveillance des requêtes pour tous les calculs d’analyse, sauf ad hoc.