Sélectionner un mode de sortie pour Structured Streaming
Cet article décrit la sélection d’un mode de sortie pour la diffusion en continu avec état. Seuls les flux avec état contenant des agrégations nécessitent une configuration en mode sortie.
Les jointures prennent uniquement en charge le mode de sortie Ajout, et le mode de sortie n’a pas d’impact sur la déduplication. Les opérateurs avec état arbitraire mapGroupsWithState
et flatMapGroupsWithState
émettent des enregistrements à l’aide de leur propre logique personnalisée, de sorte que le mode de sortie du flux n’affecte pas leur comportement.
Pour la diffusion en continu sans état, tous les modes de sortie se comportent de la même façon.
Pour configurer correctement le mode de sortie, vous devez comprendre le streaming avec état, les filigranes et les déclencheurs. Voir les articles suivants :
- Qu’est-ce que la diffusion en continu avec état ?
- Appliquer des filigranes pour contrôler les seuils de traitement des données
- Configurer des intervalles de déclencheur Structured Streaming
Qu’est-ce que le mode de sortie ?
Le mode de sortie d’une requête Structured Streaming détermine les enregistrements émis par les opérateurs de la requête pendant chaque déclencheur. Les trois types d’enregistrements qui peuvent être émis sont les suivants :
- Les enregistrements que le traitement futur ne change pas.
- Enregistrements qui ont changé depuis le dernier déclencheur.
- Tous les enregistrements de la table d’état.
Connaître les types d’enregistrements à émettre est important pour les opérateurs avec état, car une ligne particulière produite par un opérateur avec état peut passer du déclencheur au déclencheur. Par exemple, en tant qu’opérateur d’agrégation de streaming reçoit davantage de lignes pour une fenêtre particulière, les valeurs d’agrégation de cette fenêtre peuvent changer entre les déclencheurs.
Pour les opérateurs sans état, la distinction entre les types d’enregistrements n’affecte pas le comportement de l’opérateur. Les enregistrements émis par un opérateur sans état pendant un déclencheur sont toujours les enregistrements sources traités pendant ce déclencheur.
Modes de sortie disponibles
Il existe trois modes de sortie qui indiquent à un opérateur les enregistrements à émettre pendant un déclencheur particulier :
Mode de sortie | Description |
---|---|
Mode Ajout (par défaut) | Par défaut, les requêtes de streaming s’exécutent en mode Ajout. Dans ce mode, les opérateurs émettent uniquement des lignes qui ne changent pas dans les déclencheurs futurs. Les opérateurs avec état utilisent le filigrane pour déterminer quand cela se produit. |
Mode de mise à jour | En mode mise à jour, les opérateurs émettent toutes les lignes qui ont changé pendant le déclencheur, même si l’enregistrement émis peut changer dans un déclencheur suivant. |
Mode complet | Le mode complet fonctionne uniquement avec les agrégations de streaming. En mode complet, toutes les lignes résultantes produites par l’opérateur sont émises en aval. |
Considérations relatives à la production
Pour de nombreuses opérations de diffusion en continu avec état, vous devez choisir entre les modes d’ajout et de mise à jour. Les sections suivantes décrivent les considérations susceptibles d’informer votre décision.
Remarque
Le mode complet comporte certaines applications, mais peut fonctionner mal en tant que mises à l’échelle des données. Databricks recommande d’utiliser des vues matérialisées pour obtenir des garanties sémantiques associées au mode complet avec traitement incrémentiel pour de nombreuses opérations avec état. Consultez Utiliser des vues matérialisées dans Databricks SQL.
Sémantique d’application
La sémantique des applications décrit comment les applications en aval utilisent les données de streaming.
Si les services en aval doivent effectuer une seule action pour chaque écriture en aval, utilisez le mode Ajout dans la plupart des cas. Par exemple, si vous avez un service de notification en aval qui envoie des notifications pour chaque nouvel enregistrement écrit dans le récepteur, le mode Ajout garantit que chaque enregistrement n’est écrit qu’une seule fois. Le mode de mise à jour écrit l’enregistrement chaque fois que les informations d’état changent, ce qui entraînerait de nombreuses mises à jour.
Si les services en aval ont besoin de nouveaux résultats, le mode de mise à jour garantit que votre récepteur reste aussi à jour que possible. Les exemples incluent un modèle Machine Learning qui lit les fonctionnalités en temps réel ou un tableau de bord d’analyse qui effectue le suivi des agrégats en temps réel.
Compatibilité de l’opérateur et du récepteur
Structured Streaming ne prend pas en charge toutes les opérations disponibles dans Apache Spark, et certaines opérations de streaming ne sont pas prises en charge dans tous les modes de sortie. Pour plus d’informations sur les limitations des opérateurs, consultez les documents de diffusion en continu OSS.
Tous les récepteurs ne prennent pas en charge tous les modes de sortie. Delta Lake, qui sauvegarde toutes les tables managées du catalogue Unity, et Kafka prennent en charge tous les modes de sortie. Pour plus d’informations sur la compatibilité des récepteurs, consultez les documents de streaming OSS.
Latence et coût
Le mode sortie a un impact sur le temps nécessaire avant l’écriture d’un enregistrement, et la fréquence et la quantité de données écrites peuvent avoir un impact sur les coûts associés aux pipelines de diffusion en continu.
Le mode Ajout force les opérateurs avec état à émettre des résultats uniquement après la finalisation des résultats avec état, ce qui est au moins aussi long que votre délai de filigrane. Un délai de filigrane de 1 hour
dans le mode de sortie d’ajout signifie que vos enregistrements ont au moins un délai d’une heure avant d’être émis en aval.
Le mode de mise à jour entraîne une écriture par déclencheur par valeur d’agrégation. Si votre récepteur facture par écriture par enregistrement, cela peut être coûteux si les enregistrements sont mis à jour plusieurs fois avant que le délai de filigrane ne passe.
Exemples de configuration
Les exemples de code suivants montrent la configuration du mode de sortie pour la diffusion en continu des mises à jour vers les tables du catalogue Unity :
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
Consultez les documents OSS pour PySpark DataStreamWriter.outputMode ou Scala DataStreamWriter.outputMode.
Exemple de modes de diffusion en continu et de sortie avec état
L’exemple suivant est destiné à vous aider à comprendre comment le mode de sortie interagit avec les filigranes pour la diffusion en continu avec état.
Envisagez une agrégation de diffusion en continu qui calcule le chiffre d’affaires total généré chaque heure dans un magasin avec un délai de filigrane de 15 minutes. Le premier microbatch traite les enregistrements suivants :
- $15 à 14h40
- $10 à 14h30
- $30 à 15h10
À ce stade, le filigrane du moteur est à 14h55, car il soustrait 15 minutes (le délai) de la durée maximale affichée (15h10). L’opérateur d’agrégation de streaming a les éléments suivants dans son état :
[2pm, 3pm]
: $25[3pm, 4pm]
: $30
Le tableau suivant décrit ce qui se passerait dans chaque mode de sortie :
Mode de sortie | Résultat et raison |
---|---|
Ajouter | L’opérateur d’agrégation de streaming n’émet rien en aval. Cela est dû au fait que ces deux fenêtres peuvent changer à mesure que de nouvelles valeurs apparaissent avec un déclencheur suivant : le filigrane à 14h55 indique que les enregistrements après 14h55 peuvent toujours arriver, et ces enregistrements peuvent tomber dans la fenêtre [2pm, 3pm] ou la fenêtre [3pm, 4pm] . |
Update | L’opérateur émet les deux enregistrements, car les deux enregistrements ont reçu des mises à jour. |
Terminé | L’opérateur émet tous les enregistrements. |
Supposons maintenant que le flux reçoit un enregistrement supplémentaire :
- $20 à 15h20
Le filigrane est mis à jour à 15h05, car le moteur soustrait 15 minutes de 15h20. À ce stade, l’opérateur d’agrégation de diffusion en continu a les éléments suivants dans son état :
[2pm, 3pm]
: $25[3pm, 4pm]
: $50
Le tableau suivant décrit ce qui se passerait dans chaque mode de sortie :
Mode de sortie | Résultat et raison |
---|---|
Ajouter | L’opérateur d’agrégation de streaming observe le filigrane de 15h05 est supérieur à la fin de la fenêtre [2pm, 3pm] . Par la définition du filigrane, cette fenêtre ne peut plus changer, de sorte qu’elle émet la fenêtre [2pm, 3pm] . |
Update | L’opérateur d’agrégation de diffusion en continu émet la fenêtre [3pm, 4pm] , car la valeur d’état a changé de $30 à $50. |
Terminé | L’opérateur émet tous les enregistrements. |
Les éléments suivants résument le comportement des opérateurs avec état dans chaque mode d’ajout :
- En mode Ajout, écrivez des enregistrements une fois après le délai de filigrane.
- En mode mise à jour, écrivez des enregistrements qui ont changé depuis le déclencheur précédent.
- En mode complet, écrivez tous les enregistrements jamais produits par l’opérateur avec état.