Partager via


Appliquer des filigranes pour contrôler les seuils de traitement des données

Cet article présente les concepts de base du filigrane et fournit des recommandations pour l’utilisation de filigranes dans les opérations courantes de diffusion en continu avec état. Vous devez appliquer des filigranes aux opérations de diffusion en continu avec état pour éviter d’augmenter à l’infini la quantité de données conservées dans l’état, ce qui pourrait entraîner des problèmes de mémoire et augmenter les latences de traitement pendant les opérations de streaming de longue durée.

Qu’est-ce qu’un filigrane ?

Structured Streaming utilise des filigranes pour contrôler le seuil pendant combien de temps le traitement des mises à jour pour une entité d’état donnée. Voici des exemples courants d’entités d’état :

  • Agrégations sur une fenêtre de temps.
  • Clés uniques dans une jointure entre deux flux.

Lorsque vous déclarez un filigrane, vous spécifiez un champ d’horodatage et un seuil de filigrane sur un DataFrame de streaming. À mesure que de nouvelles données arrivent, le gestionnaire d’état suit l’horodatage le plus récent dans le champ spécifié et traite tous les enregistrements dans le seuil de retard.

L’exemple suivant applique un seuil de filigrane de 10 minutes à un nombre fenêtré :

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Dans cet exemple :

  • La colonneevent_time est utilisée pour définir un filigrane de 10 minutes et une fenêtre bascule de 5 minutes.
  • Un décompte est collecté pour chaque id observée pour chaque fenêtre de 5 minutes qui ne se chevauchent pas.
  • Les informations d’état sont conservées pour chaque nombre jusqu’à ce que la fin de la fenêtre soit 10 minutes plus ancienne que la dernière.event_time

Important

Les seuils de filigrane veillent à ce des enregistrements arrivant dans le seuil spécifié sont traités en fonction de la sémantique de la requête définie. Les enregistrements arrivant en retard en dehors du seuil spécifié peuvent encore être traités en tirant parti de métriques de requête, mais cela n’est pas garanti.

Comment les filigranes ont-ils un impact sur le temps de traitement et le débit ?

Les filigranes interagissent avec les modes de sortie pour contrôler le moment où les données sont écrites dans le récepteur. Étant donné que les filigranes réduisent la quantité totale d’informations d’état à traiter, une utilisation efficace des filigranes est essentielle pour un débit de streaming avec état efficace.

Notes

Tous les modes de sortie ne sont pas pris en charge pour toutes les opérations avec état.

Filigranes et mode de sortie pour les agrégations fenêtrés

Le tableau suivant détaille le traitement des requêtes avec agrégation sur un horodatage avec un filigrane défini :

Mode de sortie Comportement
Ajouter Les lignes sont écrites dans la table cible une fois le seuil de filigrane dépassé. Toutes les écritures sont retardées en fonction du seuil de retard. L’ancien état d’agrégation est supprimé une fois le seuil dépassé.
Update Les lignes sont écrites dans la table cible à mesure que les résultats sont calculés et peuvent être mises à jour et remplacées à mesure que de nouvelles données arrivent. L’ancien état d’agrégation est supprimé une fois le seuil dépassé.
Terminé L’état d’agrégation n’est pas supprimé. La table cible est réécrite avec chaque déclencheur.

Filigranes et sortie pour les jointures de flux de flux

Les jointures entre plusieurs flux prennent uniquement en charge le mode d’ajout, et les enregistrements correspondants sont écrits dans chaque lot qu’ils sont découverts. Pour les jointures internes, Databricks recommande de définir un seuil de filigrane sur chaque source de données de streaming. Cela permet d’ignorer les informations d’état pour les anciens enregistrements. Sans filigranes, flux structuré tente de joindre chaque clé des deux côtés de la jointure à chaque déclencheur.

Flux structuré a une sémantique spéciale pour prendre en charge les jointures externes. Le filigrane est obligatoire pour les jointures externes, car il indique quand une clé doit être écrite avec une valeur null après avoir été sans correspondance. Notez que si les jointures externes peuvent être utiles pour l’enregistrement des enregistrements qui ne sont jamais mis en correspondance pendant le traitement des données, car les jointures n’écrivent que dans des tables en tant qu’opérations d’ajout, ces données manquantes ne sont enregistrées qu’une fois le seuil de retard passé.

Contrôle du seuil des données tardives avec une politique de filigrane multiple dans le flux structuré

Lorsque vous utilisez plusieurs entrées de Structured Streaming, vous pouvez définir plusieurs limites afin de contrôler les seuils de tolérance pour les données tardives. La configuration de limites vous permet de contrôler les informations d’état, et a une incidence sur la latence.

Une requête de diffusion en continu peut avoir plusieurs flux d’entrée réunis ou joints. Chacun des flux d’entrée peut avoir un seuil distinct de données tardives, qui doit être toléré pour les opérations avec état. Spécifiez ces seuils à l’aide de withWatermarks("eventTime", delay) sur chacun des flux d’entrée. Voici un exemple de requête avec des jointures flux-flux.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Lors de l’exécution de la requête, la Structured Streaming suit individuellement la durée maximale d’événement observée dans chaque flux d’entrée, calcule des limites en fonction du retard correspondant et choisit sur cette base une seule limite globale à utiliser pour les opérations avec état. Par défaut, la valeur minimale est choisie comme limite globale, car elle garantit qu’aucune donnée ne sera supprimée accidentellement parce que tardive si l’un des flux est en retard sur les autres (par exemple, l’un des flux cesse de recevoir des données en raison d’erreurs en amont). En d’autres termes, la limite globale se déplace en toute sécurité au rythme du flux le plus lent, et le résultat de la requête est retardé en conséquence.

Si vous souhaitez obtenir des résultats plus rapide, vous pouvez définir la stratégie de limites multiples pour choisir la valeur maximale comme limite globale en définissant la configuration SQL spark.sql.streaming.multipleWatermarkPolicy sur max (la configuration par défaut est min). Cela permet à la limite globale de se déplacer au rythme du flux le plus rapide. Toutefois, cette configuration a pour effet d’annuler les données des flux les plus lents. À cause de cela, Databricks recommande d’utiliser cette configuration judicieusement.

Supprimer des doublons dans un filigrane

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez dédupliquer des enregistrements au sein d’un seuil de filigrane en tirant parti d’un identificateur unique.

Structured Streaming fournit des garanties de traitement en une seule fois, mais ne déduplique pas automatiquement des enregistrements à partir de sources de données. Vous pouvez utiliser dropDuplicatesWithinWatermark pour dédupliquer des enregistrements sur n’importe quel champ spécifié, ce qui vous permet de supprimer les doublons d’un flux, même en cas de différence de champs (telle que l’heure d’événement ou l’heure d’arrivée).

La suppression des enregistrements dupliqués arrivant dans le filigrane spécifié est garantie. Cette garantie est stricte dans une seule direction et vous pouvez également supprimer des enregistrements en double qui arrivent en dehors du seuil spécifié. Pour supprimer tous les doublons, vous devez définir le seuil de délai d’un filigrane supérieur au nombre maximal de différences d’horodatage parmi les événements dupliqués.

Vous devez spécifier un filigrane pour utiliser la méthode dropDuplicatesWithinWatermark, comme dans l’exemple suivant :

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])