Partager via


Optimize le traitement avec état dans Delta Live Tables avec des filigranes

Pour gérer efficacement les données conservées dans l’état, utilisez des filigranes lors de l’exécution d’un traitement de flux avec état dans Delta Live Tables, y compris les agrégations, les jointures et la déduplication. Cet article explique comment utiliser des filigranes dans vos requêtes Delta Live Tables et inclut des exemples d’opérations recommandées.

Remarque

Pour vous assurer que les requêtes qui effectuent des agrégations sont traitées de manière incrémentielle et non entièrement recalculées avec chaque update, vous devez utiliser des watermarks.

Qu’est-ce qu’un watermark?

Dans le traitement de flux, un watermark est une fonctionnalité Apache Spark qui peut définir un seuil basé sur le temps pour le traitement des données lors de l’exécution d’opérations avec état telles que des agrégations. Les données arrivant sont traitées jusqu'à ce que le seuil soit atteint, moment auquel le temps window imposé par le seuil est clôturé. Les filigranes peuvent être utilisés pour éviter les problèmes lors du traitement des requêtes, principalement lors du traitement de jeux de données plus volumineux ou d’un traitement de longue durée. Ces problèmes peuvent inclure une latence élevée dans la production de résultats et même des erreurs de mémoire insuffisante (OOM) en raison de la quantité de données conservées en l’état pendant le traitement. Étant donné que les données de diffusion en continu sont intrinsèquement non ordonnées, les filigranes prennent également en charge le calcul correct des opérations telles que les agrégations de window de temps.

Pour en savoir plus sur l’utilisation de filigranes dans le traitement de flux, consultez Filigrane dans Apache Spark Structured Streaming et Appliquer des filigranes pour contrôler les seuils de traitement des données.

Comment définir une watermark ?

Vous définissez un watermark en spécifiant un champ d’horodatage et une valeur représentant le seuil de temps pour l’arrivée de données tardives. Les données sont considérées tardives si elles arrivent après le seuil de temps défini. Par exemple, si le seuil est défini sur 10 minutes, les enregistrements arrivant après le seuil de 10 minutes peuvent être supprimés.

Étant donné que les enregistrements qui arrivent après le seuil défini peuvent être supprimés, la sélection d’un seuil qui répond à votre latence par rapport aux exigences de correction est importante. Le choix d’un seuil plus petit entraîne l’émission d’enregistrements plus tôt, mais signifie également que les enregistrements en retard sont plus susceptibles d’être supprimés. Un seuil plus important signifie une attente plus longue, mais des données peut-être plus complètes. En raison de la taille d’état supérieure, un seuil plus élevé peut également nécessiter des ressources informatiques supplémentaires. Étant donné que la valeur de seuil dépend de vos besoins en matière de données et de traitement, le test et la surveillance de votre traitement sont importants pour déterminer un seuil optimal.

Vous utilisez la fonction withWatermark() en Python pour définir une watermark. Dans SQL, utilisez la clause WATERMARK pour définir une watermark:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Utiliser des filigranes avec des jointures de flux-flux

Pour les jointures entre flux, vous devez définir un watermark de chaque côté du join et une clause d’intervalle de temps. Étant donné que chaque source join a une vue incomplète des données, la clause d’intervalle de temps est requise pour indiquer au moteur de diffusion en continu quand aucune correspondance supplémentaire ne peut être effectuée. La clause d’intervalle de temps doit utiliser les mêmes champs que ceux utilisés pour définir les filigranes.

Étant donné qu’il peut arriver que chaque flux nécessite des seuils différents pour les filigranes, il n’est pas nécessaire que les flux aient les mêmes seuils. Pour éviter les données manquantes, le moteur de streaming gère une watermark globale basée sur le flux le plus lent.

L’exemple suivant joint un flux d’impressions publicitaires et un flux de clics d’utilisateur sur les publicités. Dans cet exemple, un clic doit se produire dans les 3 minutes après l’impression. Une fois l’intervalle de temps de 3 minutes passé, les lignes de l’état qui ne peuvent plus être mises en correspondance sont supprimées.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Effectuer des agrégations fenêtrées avec des filigranes

Une opération avec état courante sur les données de streaming est une agrégation fenêtrée. Les agrégations fenêtrées sont similaires aux agrégations groupées, sauf que les values d’agrégation sont retournées pour l’set de lignes qui font partie de la window définie.

Une window peut être définie comme une certaine longueur, et une opération d’agrégation peut être effectuée sur toutes les lignes qui font partie de cette window. Spark Streaming prend en charge trois types de fenêtres :

  • Fenêtres bascule (fixe) : une série d’intervalles de temps contigus fixes, qui ne se chevauchent pas. Un enregistrement d’entrée appartient à une seule window.
  • Fenêtres glissantes : similaires aux fenêtres bascules, les fenêtres glissantes sont de taille fixe, mais les fenêtres peuvent se chevaucher et un enregistrement peut tomber dans plusieurs fenêtres.

Lorsque les données arrivent au-delà de la fin de l'window plus la longueur du watermark, aucune nouvelle donnée n’est acceptée pour l'window, le résultat de l’agrégation est émis et l’état de la window est supprimé.

L’exemple suivant calcule le total des impressions toutes les 5 minutes en utilisant un windowfixe. Dans cet exemple, la clause select utilise l’alias impressions_window, puis la window elle-même est définie dans le cadre de la clause GROUP BY. Le window doit être basé sur le même horodatage column que le watermark, le clickTimestampcolumn dans cet exemple.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Exemple similaire dans Python pour calculer les bénéfices sur les fenêtres fixes horaires :

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Dédupliquer les enregistrements de diffusion en continu

Structured Streaming a des garanties de traitement en une seule fois, mais ne déduplique pas automatiquement des enregistrements à partir de sources de données. Par exemple, étant donné que de nombreuses files d’attente de messages ont au moins une fois des garanties, les enregistrements en double doivent être attendus lors de la lecture de l’une de ces files d’attente de messages. Vous pouvez utiliser la fonction dropDuplicatesWithinWatermark() pour dédupliquer des enregistrements sur n’importe quel champ spécifié, ce qui supprime les doublons d’un flux, même en cas de différence de champs (telle que l’heure d’événement ou l’heure d’arrivée). Vous devez spécifier une watermark pour utiliser la fonction dropDuplicatesWithinWatermark(). Toutes les données en double qui arrivent dans l’intervalle de temps spécifié par l'watermark sont supprimées.

Les données ordonnées sont importantes car les données désordonnées provoquent la valeur watermark à se retrouver plus avancée de manière incorrecte. Ensuite, lorsque des données plus anciennes arrivent, elles sont considérées comme tardives et sont supprimées. Utilisez l’option withEventTimeOrder pour traiter l’instantané initial dans l’ordre en fonction de l’horodatage spécifié dans watermark. L’option withEventTimeOrder peut être déclarée dans le code définissant le jeu de données ou dans les paramètres de pipeline à l’aide de spark.databricks.delta.withEventTimeOrder.enabled. Par exemple :

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Remarque

L’option withEventTimeOrder est prise en charge uniquement avec Python.

Dans l’exemple suivant, les données sont traitées dans l'ordre de clickTimestamp, et les enregistrements qui arrivent dans un intervalle de 5 secondes entre eux et qui contiennent des doublons de userId et clickAdIdcolumns sont supprimés.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimize la configuration du pipeline pour le traitement avec état

Pour éviter les problèmes de production et la latence excessive, Databricks recommande d’activer la gestion de l’état basée sur RocksDB pour votre traitement de flux avec état, en particulier si votre traitement nécessite d’enregistrer une grande quantité d’états intermédiaires.

Les pipelines sans serveur gèrent automatiquement les configurations du magasin d’états.

Vous pouvez activer la gestion d'état basée sur RocksDB en définissant la configuration suivante avant de déployer un pipeline :

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Pour en savoir plus sur le magasin d’état RocksDB, y compris les recommandations de configuration pour RocksDB, consultez Configurer un magasin d’état RocksDB sur Azure Databricks.