Partager via


Gestion de requêtes volumineuses dans des workflows interactifs

Un défi avec les flux de travail de données interactifs consiste à gérer des requêtes volumineuses. Cela inclut les requêtes qui génèrent trop de lignes de sortie, récupèrent de nombreuses partitions externes ou calculent sur des jeux de données extrêmement volumineux. Ces requêtes peuvent être extrêmement lentes, saturées de ressources de calcul et rendre difficile pour d’autres personnes de partager le même calcul.

Query Watchdog est un processus qui empêche les requêtes de monopoliser les ressources de calcul en examinant les causes les plus courantes des requêtes volumineuses et en terminant les requêtes qui réussissent un seuil. Cet article explique comment activer et configurer Query Watchdog.

Important

Query Watchdog est activé pour tous les calculs à usage unique créés à l’aide de l’interface utilisateur.

Exemple de requête perturbatrice

Un analyste effectue des requêtes ad hoc dans un entrepôt de données juste-à-temps. L’analyste utilise un calcul de mise à l’échelle automatique partagée qui facilite l’utilisation d’un calcul unique en même temps pour plusieurs utilisateurs. Supposons qu’il existe deux tables qui ont chacun un million de lignes.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Ces tailles de table sont gérables dans Apache Spark. Toutefois, ils incluent chacun une colonne join_key avec une chaîne vide dans chaque ligne. Cela peut se produire si les données ne sont pas parfaitement propres ou s’il existe une asymétrie significative des données où certaines clés sont plus répandues que d’autres. Ces clés de jointure vides sont beaucoup plus répandues que n’importe quelle autre valeur.

Dans le code suivant, l’analyste joint ces deux tables sur leurs clés, ce qui produit une sortie de 1 000 000 000 000 résultats, et tous ces deux produits sur un seul exécuteur (l’exécuteur qui obtient la clé " ") :

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Cette requête semble être en cours d’exécution. Mais sans connaître les données, l’analyste constate qu’il n’y a « qu’une seule tâche » laissée au cours de l’exécution du travail. La requête ne se termine jamais, laissant l’analyste frustré et confus quant à la raison pour laquelle elle ne fonctionnait pas.

Dans ce cas, il n’existe qu’une seule clé de jointure problématique. D’autres fois, il peut y avoir beaucoup plus.

Activer et configurer Query Watchdog

Pour activer et configurer Query Watchdog, les étapes suivantes sont requises.

  • Activez Watchdog avec spark.databricks.queryWatchdog.enabled.
  • Configurez le runtime de tâche avec spark.databricks.queryWatchdog.minTimeSecs.
  • Affichez les résultats avec spark.databricks.queryWatchdog.minOutputRows.
  • Configurez le ratio de sortie avec spark.databricks.queryWatchdog.outputRatioThreshold.

Pour empêcher une requête de créer trop de lignes de sortie pour le nombre de lignes d’entrée, vous pouvez activer Query Watchdog et configurer le nombre maximal de lignes de sortie sous la forme d’un multiple du nombre de lignes d’entrée. Dans cet exemple, nous utilisons un ratio de 1 000 (valeur par défaut).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

Cette dernière configuration déclare que toute tâche donnée ne doit jamais produire plus de 1 000 fois le nombre de lignes d’entrée.

Conseil

Le ratio de sortie est entièrement personnalisable. Nous vous recommandons de commencer plus bas et de voir quel seuil fonctionne bien pour vous et votre équipe. Une plage de 1 000 à 10 000 est un bon point de départ.

Non seulement Query Watchdog empêche les utilisateurs de monopoliser les ressources de calcul pour les travaux qui ne seront jamais terminés, il permet également de gagner du temps en cas d’échec rapide d’une requête qui n’aurait jamais été terminée. Par exemple, la requête suivante échoue après plusieurs minutes, car elle dépasse le ratio.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Voici ce que vous verrez :

Surveillance des requêtes

Il suffit généralement d’activer query Watchdog et de définir le ratio de seuil de sortie/entrée, mais vous avez également la possibilité de définir deux propriétés supplémentaires : spark.databricks.queryWatchdog.minTimeSecs et spark.databricks.queryWatchdog.minOutputRows. Ces propriétés spécifient la durée minimale d’exécution d’une tâche donnée dans une requête avant de l’annuler et le nombre minimal de lignes de sortie d’une tâche dans cette requête.

Par exemple, vous pouvez définir minTimeSecs sur une valeur plus élevée si vous souhaitez lui donner la possibilité de produire un grand nombre de lignes par tâche. De même, vous pouvez définir spark.databricks.queryWatchdog.minOutputRows à dix millions si vous souhaitez arrêter une requête uniquement après qu’une tâche dans cette requête a généré dix millions de lignes. Toute valeur inférieure et la requête réussira, même si le ratio production/consommation a été dépassé.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Conseil

Si vous configurez Query Watchdog dans un notebook, la configuration ne persiste pas dans les redémarrages de calcul. Si vous souhaitez configurer Query Watchdog pour tous les utilisateurs d’un calcul, nous vous recommandons d’utiliser une configuration de calcul .

Détecter la requête sur un jeu de données extrêmement volumineux

Une autre requête volumineuse classique peut analyser une grande quantité de données à partir de grandes tables/jeux de données. L’opération d’analyse peut durer longtemps et saturer les ressources de calcul (même la lecture des métadonnées d’une grande table Hive peut prendre beaucoup de temps). Vous pouvez définir maxHivePartitions pour empêcher l’extraction d’un trop grand nombre de partitions à partir d’une grande table Hive. De même, vous pouvez également définir maxQueryTasks pour limiter les requêtes sur un jeu de données extrêmement volumineux.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Quand devez-vous activer Query Watchdog ?

Query Watchdog doit être activé pour le calcul d’analytique ad hoc où les analystes SQL et les scientifiques des données partagent un calcul donné et un administrateur doit s’assurer que les requêtes « jouent bien » entre elles.

Quand devez-vous désactiver Query Watchdog ?

En général, nous ne conseillons pas d’annuler avec impatience les requêtes utilisées dans un scénario ETL, car il n’y a généralement pas d’humain dans la boucle pour corriger l’erreur. Nous vous recommandons de désactiver Query Watchdog pour tout sauf le calcul analytique ad hoc.