Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires
Cet article traite de l’utilisation de foreachBatch
avec le flux structuré pour écrire la sortie d'une requête de streaming vers des sources de données qui ne disposent pas d'un récepteur de streaming existant.
Le modèle de code streamingDF.writeStream.foreachBatch(...)
vous permet d’appliquer des fonctions de lot aux données de sortie de chaque micro-batch de la requête de streaming. Fonctions utilisées avec foreachBatch
qui admettent deux paramètres :
- Un DataFrame contenant les données de sortie d’un micro-lot.
- L’identifiant unique du micro-lot.
Vous devez utiliser foreachBatch
pour les opérations de fusion de Delta Lake dans le flux structuré. Consultez Upsert à partir de requêtes de diffusion en continu à l’aide de foreachBatch.
Appliquer d’autres opérations DataFrame
De nombreuses opérations DataFrame et DataSet ne sont pas prises en charge dans les DataFrames de streaming, Spark ne prend pas en charge la génération de plans incrémentiels dans ces cas. En utilisant foreachBatch()
, vous pouvez appliquer certaines de ces opérations sur chaque sortie de micro-batch. Par exemple, vous pouvez utiliser foreachBath()
et l'opération SQL MERGE INTO
pour écrire la sortie des agrégations de streaming dans une table Delta en mode de mise à jour. Pour plus de détails, voir MERGE INTO.
Important
foreachBatch()
ne fournit que des garanties d'écriture au moins une fois. Mais vous pouvez utiliser lebatchId
fourni à la fonction pour dédupliquer la sortie et obtenir une garantie une seule fois. Dans les deux cas, vous devrez définir vous-même la sémantique de bout en bout.foreachBatch()
ne fonctionne pas avec le mode de traitement continu car il repose fondamentalement sur l'exécution par micro-batch d'une requête de streaming. Si vous écrivez des données en mode continu, utilisezforeach()
à la place.
Un dataframe vide peut être appelé avec foreachBatch()
et le code utilisateur doit être résilient pour permettre un fonctionnement approprié. En voici un exemple :
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Changements de comportement pour foreachBatch
dans Databricks Runtime 14.0
Dans Databricks Runtime 14.0 et versions ultérieures sur le calcul configuré avec un mode d’accès partagé, les modifications de comportement suivantes s’appliquent :
- Les commandes
print()
écrivent la sortie dans les journaux du pilote. - Vous ne pouvez pas accéder au sous-module
dbutils.widgets
à l’intérieur de la fonction. - Tous les fichiers, modules ou objets référencés dans la fonction doivent être sérialisables et disponibles sur Spark.
Réutiliser les sources de données de lot existantes
En utilisant foreachBatch()
, vous pouvez utiliser les rédacteurs de données par lots existants pour les récepteurs de données qui ne prennent pas en charge la diffusion en flux structuré. Voici quelques exemples :
De nombreuses autres sources de données par lots peuvent être utilisées à partir de foreachBatch()
. Consultez Se connecter aux sources de données.
Écrire dans plusieurs emplacements
Si vous devez écrire la sortie d'une requête en continu à plusieurs emplacements, Databricks recommande d'utiliser plusieurs rédacteurs en flux structuré pour une meilleure parallélisation et un meilleur débit.
L’utilisation de foreachBatch
pour écrire dans plusieurs récepteurs sérialise l’exécution des écritures en streaming, ce qui peut augmenter la latence pour chaque micro-lot.
Si vous utilisez foreachBatch
pour écrire dans plusieurs tables Delta, consultez Écritures de table idempotentes dans foreachBatch.