Lectures et écritures en diffusion en continu sur des tables Delta
Delta Lake est profondément intégré à Spark Structured Streaming par le biais de readStream
et writeStream
. Delta Lake surmonte bon nombre des limitations généralement associées aux systèmes et aux fichiers de streaming, notamment :
- Le regroupement de petits fichiers produits par une ingestion à faible latence.
- Le maintien d’un traitement « une seule fois » avec plus d’un flux (ou des programmes de traitement par lots simultanés).
- La découverte efficace des nouveaux fichiers lors de l’utilisation de fichiers comme source d’un flux.
Remarque
Cet article détaille l’utilisation des tables Delta Lake comme sources et récepteurs de diffusion en continu. Pour découvrir comment charger des données à l’aide de tables de diffusion en continu dans Databricks SQL, consultez l’article Charger des données au moyen des tables de diffusion en continu dans Databricks SQL.
Pour plus d’informations sur les jointures statiques de flux avec Delta Lake, consultez Jointures statiques de flux.
Table Delta comme source
Structured Streaming lit de manière incrémentielle les tables Delta. Lorsqu’une requête de diffusion en continu est active sur une table Delta, les nouveaux enregistrements sont traités de manière idempotente lorsque les nouvelles versions de table sont validées dans la table source.
Les exemples de code suivants montrent la configuration d’une lecture en continu à l’aide du nom de la table ou du chemin d’accès au fichier.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Important
Si le schéma d’une table Delta change après le début d’une lecture en continu sur la table, la requête échoue. Pour la plupart des modifications de schéma, vous pouvez redémarrer le flux pour résoudre l’incompatibilité de schéma et poursuivre le traitement.
Dans Databricks Runtime 12.2 LTS et les versions antérieures, vous ne pouvez pas diffuser en continu à partir d’une table Delta avec le mappage de colonnes activé qui a subi une évolution de schéma non additive, telle que le changement de nom ou la suppression de colonnes. Si vous souhaitez obtenir plus d’informations, consultez Diffusion en continu avec un mappage de colonnes et des modifications de schéma.
Limiter le débit d’entrée
Les options suivantes sont disponibles pour contrôler les microlots :
maxFilesPerTrigger
: Nombre de nouveaux fichiers à prendre en compte dans chaque microlot. La valeur par défaut est 1000.maxBytesPerTrigger
: Quantité de données traitées dans chaque microlot. Cette option définit une valeur « soft max », qui signifie qu’un lot traite approximativement cette quantité de données et peut traiter plus que la limite afin de faire avancer la requête de streaming dans les cas où la plus petite unité d’entrée est supérieure à cette limite. Elle n’est pas définie par défaut.
Si vous utilisez maxBytesPerTrigger
conjointement avec maxFilesPerTrigger
, le microlot traite les données jusqu’à ce que la limite maxFilesPerTrigger
ou maxBytesPerTrigger
soit atteinte.
Remarque
Dans les cas où les transactions de table source sont nettoyées en raison de la logRetentionDuration
configuration et que la requête de diffusion en continu tente de traiter ces versions, par défaut, la requête ne parvient pas à éviter la perte de données. Vous pouvez choisir l’option failOnDataLoss
à false
pour ignorer les données perdues et poursuivre le traitement.
Envoyer en streaming un flux de capture des changements de données (CDC) Delta Lake
Le flux des changements de données Delta Lake enregistre les modifications apportées à une table Delta, y compris les mises à jour et les suppressions. Lorsque cela est activé, vous pouvez envoyer en streaming un flux des changements de données et écrire une logique pour effectuer les insertions, les mises à jour et les suppressions dans les tables en aval. La sortie du flux des changements de données diffère légèrement de la table Delta qu’elle décrit, mais cela fournit une solution pour propager des modifications incrémentielles vers les tables en aval dans une architecture de médaillon.
Important
Dans Databricks Runtime 12.2 LTS et les versions antérieures, vous ne pouvez pas diffuser en continu à partir du flux des changements de données d’une table Delta avec le mappage de colonnes activé qui a subi une évolution de schéma non additive, telle que le changement de nom ou la suppression de colonnes. Consultez Diffusion en continu avec un mappage de colonnes et des modifications de schéma.
Ignorer les mises à jour et les suppressions
Le flux structuré ne traite pas les entrées qui ne sont pas des ajouts et lève une exception si des modifications sont apportées à la table utilisée comme source. Il existe deux stratégies principales pour traiter les modifications qui ne peuvent pas être propagées automatiquement en aval :
- Vous pouvez supprimer la sortie et le point de contrôle, puis redémarrer le flux depuis le début.
- Vous pouvez définir l’une ou l’autre de ces deux options :
ignoreDeletes
: ignore les transactions qui suppriment des données aux limites de la partition.skipChangeCommits
: ignorer les transactions qui suppriment ou modifient des enregistrements existants.skipChangeCommits
englobeignoreDeletes
.
Remarque
Dans Databricks Runtime 12.2 LTS et versions ultérieures, skipChangeCommits
remplace le paramètre ignoreChanges
précédent. Dans Databricks Runtime 11.3 LTS et versions antérieures, ignoreChanges
est la seule option prise en charge.
La sémantique pour ignoreChanges
diffère considérablement de skipChangeCommits
. Lorsque ignoreChanges
est activé, les fichiers de données réécrits dans la table source sont réécrits après une opération de modification de données telle que UPDATE
, MERGE INTO
, DELETE
(dans des partitions) ou OVERWRITE
. Les lignes inchangées sont souvent émises à côté de nouvelles lignes, de sorte que les consommateurs en aval doivent être en mesure de gérer les doublons. Les suppressions ne sont pas propagées en aval. ignoreChanges
englobe ignoreDeletes
.
skipChangeCommits
ignore entièrement les opérations de modification de fichier. Les fichiers de données qui sont réécrits dans la table source en raison d’une opération de modification des données comme UPDATE
, MERGE INTO
, DELETE
et OVERWRITE
sont entièrement ignorés. Pour refléter les modifications apportées aux tables sources en amont, vous devez implémenter une logique distincte pour propager ces modifications.
Les charges de travail configurées avec ignoreChanges
continuent de fonctionner à l’aide d’une sémantique connue, mais Databricks recommande d’utiliser skipChangeCommits
pour toutes les nouvelles charges de travail. La migration des charges de travail à l’aide de ignoreChanges
vers skipChangeCommits
nécessite une logique de refactorisation.
Exemple
Par exemple, supposons que vous ayez une table user_events
avec des colonnes date
, user_email
et action
, partitionnée par date
. Vous sortez de la table user_events
et vous devez en supprimer les données conformément au RGPD.
Lorsque vous supprimez des données aux limites des partitions (c’est-à-dire que WHERE
se trouve sur une colonne de partition), les fichiers sont déjà segmentés par valeur, de sorte que la suppression supprime uniquement ces fichiers des métadonnées. Lorsque vous supprimez une partition entière de données, vous pouvez utiliser les éléments suivants :
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Si vous supprimez des données dans plusieurs partitions (dans cet exemple, le filtrage sur user_email
), utilisez la syntaxe suivante :
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Si vous mettez à jour un user_email
avec l’instruction UPDATE
, le fichier contenant le user_email
en question est réécrit. Utilisez skipChangeCommits
pour ignorer les fichiers de données modifiés.
Spécifier la position initiale
Vous pouvez utiliser les options suivantes pour spécifier le point de départ de la source de streaming Delta Lake sans traiter la totalité de la table.
startingVersion
: Version de Delta Lake de départ. Databricks recommande d’omettre cette option pour la plupart des charges de travail. Lorsqu’il n’est pas défini, le flux démarre à partir de la dernière version disponible, y compris une capture instantanée complète de la table à ce moment-là.S’il est spécifié, le flux lit toutes les modifications apportées à la table Delta à partir de la version spécifiée (inclusive). Si la version spécifiée n’est plus disponible, le flux ne démarre pas. Vous pouvez obtenir les versions validées à partir de la colonne
version
de la sortie de la commande DESCRIBE HISTORY.Pour retourner uniquement les dernières modifications, spécifiez
latest
.startingTimestamp
: Timestamp de départ. Toutes les modifications de table validées à partir de ce timestamp (inclus) sont lues par le lecteur de streaming. Si le timestamp fourni précède toutes les validations de table, la lecture en continu commence par le timestamp disponible le plus ancien. Valeurs possibles :- Une chaîne de timestamp. Par exemple :
"2019-01-01T00:00:00.000Z"
. - Une chaîne de date. Par exemple,
"2019-01-01"
- Une chaîne de timestamp. Par exemple :
Vous ne pouvez pas définir les deux options en même temps. Elles prennent effet uniquement lors du démarrage d’une nouvelle requête de streaming. Si une requête de streaming a démarré et que sa progression a été enregistrée dans son point de contrôle, ces options sont ignorées.
Important
Bien que vous puissiez démarrer la source de streaming à partir d’une version ou d’un timestamp spécifié, le schéma de la source de streaming est toujours le schéma le plus récent de la table Delta. Vous devez vous assurer qu’aucune modification de schéma incompatible n’a été apportée à la table Delta après la version ou le timestamp spécifié. Sinon, la source de streaming peut renvoyer des résultats incorrects lors de la lecture des données avec un schéma incorrect.
Exemple
Par exemple, supposons que vous ayez une table user_events
. Si vous souhaitez lire les modifications apportées depuis la version 5, utilisez :
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Si vous souhaitez lire les modifications apportées depuis la version 2018-10-18, utilisez :
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Traiter la capture instantanée initiale sans suppression des données
Remarque
Cette fonctionnalité est disponible sur Databricks Runtime 11.3 LTS et versions ultérieures. Cette fonctionnalité est disponible en préversion publique.
Lorsque vous utilisez une table Delta comme source de flux, la requête traite d’abord toutes les données présentes dans la table. La table Delta de cette version est appelée capture instantanée initiale. Par défaut, les fichiers de données de la table Delta sont traités en fonction du dernier fichier modifié. Toutefois, l’heure de la dernière modification ne représente pas nécessairement l’ordre chronologique des événements d’enregistrement.
Dans une requête de diffusion en continu avec état avec un filigrane défini, le traitement des fichiers par heure de modification peut entraîner le traitement des enregistrements dans un ordre incorrect. Cela peut entraîner le marquage des enregistrements en tant qu’événements en retard par le filigrane.
Vous pouvez éviter le problème de suppression de données en activant l’option suivante :
- withEventTimeOrder : indique si la capture instantanée initiale doit être traitée selon l’ordre chronologique des événements.
Une fois l’ordre chronologique des événements activé, l’intervalle de temps d’événement des données de capture instantanée initiale est divisé en compartiments de temps. Chaque micro-lot traite un compartiment en filtrant les données dans l’intervalle de temps. Les options de configuration maxFilesPerTrigger et maxBytesPerTrigger sont toujours applicables pour contrôler la taille du microbatch, mais de manière approximative en raison de la nature du traitement.
Le graphique ci-dessous montre ce processus :
Informations notables sur cette fonctionnalité :
- Le problème de suppression de données se produit uniquement lorsque la capture instantanée initiale Delta d’une requête de diffusion en continu avec état est traitée dans l’ordre par défaut.
- Vous ne pouvez pas modifier
withEventTimeOrder
une fois la requête de flux démarrée pendant le traitement de la capture instantanée initiale. Pour redémarrer le processus en modifiantwithEventTimeOrder
, vous devez supprimer le point de contrôle. - Si vous exécutez une requête de flux avec l’option WithEventTimeOrder activée, vous ne pouvez pas passer à une version DBR antérieure qui ne prend pas en charge cette fonctionnalité tant que le traitement de la capture instantanée initiale n’est pas terminé. Si vous devez passer à une version antérieure, vous pouvez attendre la fin du traitement de la capture instantanée initiale, ou supprimer le point de contrôle et redémarrer la requête.
- Cette fonctionnalité n’est pas prise en charge dans les rares scénarios suivants :
- La colonne d’heure d’événement est une colonne générée et il existe des transformations de non-projection entre la source Delta et le filigrane.
- Il existe un filigrane qui a plusieurs sources Delta dans la requête de flux.
- Avec l’ordre chronologique des événements activé, les performances de traitement de la capture instantanée initiale Delta peuvent être plus lentes.
- Chaque micro-lot analyse la capture instantanée initiale pour filtrer les données dans l’intervalle de temps d’événement correspondant. Pour une action de filtrage plus rapide, il est conseillé d’utiliser une colonne source Delta comme heure d’événement afin de pouvoir appliquer la fonctionnalité Ignorer des données (consultez Saut de données pour Delta Lake pour savoir si elle est applicable). En outre, le partitionnement de table le long de la colonne de temps d’événement peut accélérer la vitesse de traitement. Vous pouvez vérifier l’interface utilisateur Spark pour voir le nombre de fichiers delta analysés par un micro-lot spécifique.
Exemple
Supposons que vous ayez une table user_events
avec une colonne event_time
. Votre requête de diffusion en continu est une requête d’agrégation. Si vous souhaitez vous assurer qu’aucune suppression de données n’aura lieu lors du traitement de la capture instantanée initiale, vous pouvez utiliser :
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Notes
Vous pouvez également activer cette fonctionnalité avec la configuration Spark sur le cluster qui s’applique à toutes les requêtes de diffusion en continu : spark.databricks.delta.withEventTimeOrder.enabled true
.
Table Delta comme récepteur
Vous pouvez également écrire des données dans une table Delta à l’aide de Structured Streaming. Le journal des transactions permet à Delta Lake de garantir un traitement « une seule fois », même si d’autres flux ou requêtes par lot s’exécutent simultanément sur la table.
Notes
La fonction VACUUM
Delta Lake supprime tous les fichiers non gérés par Delta Lake, mais ignore les répertoires qui commencent par _
. Vous pouvez stocker sans risque des points de contrôle en même temps que d’autres données et métadonnées pour une table Delta à l’aide d’une structure de répertoires telle que <table-name>/_checkpoints
.
Métriques
Vous pouvez connaître le nombre d’octets et le nombre de fichiers encore à traiter dans un processus de requête de streaming grâce aux métriques numBytesOutstanding
et numFilesOutstanding
. Les métriques supplémentaires sont les suivantes :
numNewListedFiles
: nombre de fichiers Delta Lake listés afin de calculer le backlog pour ce lot.backlogEndOffset
: version de table utilisée pour calculer le backlog.
Si vous exécutez le flux dans un notebook, vous pouvez voir ces métriques sous l’onglet Données brutes du tableau de bord de progression des requêtes de diffusion en continu :
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Mode d’ajout
Par défaut, les flux s’exécutent en mode ajout, ce qui adjoint de nouveaux enregistrements à la table.
Utilisez la méthode toTable
lors de la diffusion en continu vers des tables, comme dans l’exemple suivant :
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Mode Complet
Vous pouvez également utiliser Structured Streaming pour remplacer la table entière par chaque lot. Un exemple de cas d’usage consiste à calculer un résumé à l’aide de l’agrégation :
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
L’exemple précédent met continuellement à jour une table qui contient le nombre agrégé d’événements par client.
Pour les applications dont les exigences en matière de latence sont plus souples, vous pouvez économiser des ressources de calcul grâce à des déclencheurs à usage unique. Utilisez-les pour mettre à jour les tables d’agrégation récapitulatives selon une planification donnée, en traitant uniquement les nouvelles données qui sont arrivées depuis la dernière mise à jour.
Upsert à partir de requêtes de diffusion en continu à l’aide de foreachBatch
Vous pouvez utiliser une combinaison de merge
et foreachBatch
pour écrire des upserts complexes à partir d’une requête de streaming dans une table Delta. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.
Ce modèle a de nombreuses applications, dont celles-ci :
- Écrire des agrégats de diffusion en continu en mode mise à jour : cette solution est bien plus efficace que le mode complet.
- Écrire un flux de modifications de base de données dans une table Delta : la requête de fusion pour l’écriture des changements de données peut être utilisée dans
foreachBatch
pour appliquer en permanence un flux de modifications à une table Delta. - Écrire un flux de données dans une table Delta avec déduplication : la requête de fusion par insertion uniquement pour déduplication peut être utilisée dans
foreachBatch
pour écrire en continu des données (avec des doublons) dans une table Delta avec déduplication automatique.
Notes
- Assurez-vous que votre instruction
merge
dansforeachBatch
est idempotente, car des redémarrages de la requête de diffusion en continu peuvent appliquer l’opération sur le même lot de données plusieurs fois. - Quand
merge
est utilisé dansforeachBatch
, le taux de données d'entrée de la requête de diffusion en continu (signalé parStreamingQueryProgress
et visible dans le graphique du taux du notebook) peut être signalé comme un multiple du taux réel de génération des données à la source. Cela est dû au fait quemerge
lit les données d’entrée plusieurs fois, entraînant une multiplication des métriques d’entrée. S’il s’agit d’un goulot d’étranglement, vous pouvez mettre en cache le lot tramedonnées avant l’opérationmerge
, puis le sortir du cache après l’opérationmerge
.
L’exemple suivant montre comment utiliser SQL dans foreachBatch
pour accomplir cette tâche :
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Vous pouvez également choisir d’utiliser les API Delta Lake pour effectuer des upserts de streaming, comme dans l’exemple suivant :
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Écritures de tables idempotentes dans foreachBatch
Remarque
Databricks recommande de configurer une écriture de streaming distincte pour chaque récepteur que vous souhaitez mettre à jour. L’utilisation de foreachBatch
pour écrire dans plusieurs tables sérialise les écritures, ce qui réduit la parallélisation et augmente la latence globale.
Les tables Delta prennent en charge les options suivantes DataFrameWriter
pour effectuer des écritures dans plusieurs tables au sein d'idempotents foreachBatch
:
txnAppId
: chaîne unique que vous pouvez transmettre à chaque écriture DataFrame. Par exemple, vous pouvez utiliser l’ID StreamingQuery commetxnAppId
.txnVersion
: Nombre à croissance monotone qui fait office de version de transaction.
Delta Lake utilise la combinaison de txnAppId
et txnVersion
pour identifier les écritures en double et les ignorer.
Si l’écriture d’un lot est interrompue en cas d’échec, la réexécution du lot utilise la même application et le même ID de lot pour aider le runtime à identifier correctement les écritures en double et à les ignorer. L’ID d’application (txnAppId
) peut être toute chaîne unique générée par l’utilisateur et ne doit pas nécessairement être liée à l’ID du flux. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.
Avertissement
Si vous supprimez le point de contrôle de diffusion en continu et redémarrez la requête avec un nouveau point de contrôle, vous devez fournir un autre txnAppId
. Les nouveaux points de contrôle commencent par un ID de lot de 0
. Delta Lake utilise l’ID de lot et txnAppId
comme clé unique, et ignore les lots avec des valeurs déjà vues.
L’exemple de code suivant illustre ce modèle :
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}