Qu’est-ce que la diffusion en continu avec état ?
Une requête de Structured Streaming avec état nécessite des mises à jour incrémentielles d’informations d’état intermédiaires, tandis qu’une requête de Structured Streaming sans état effectue uniquement le suivi des informations sur les lignes traitées entre la source et le récepteur.
Les opérations avec état incluent l’agrégation de diffusion en continu, les dropDuplicates
de diffusion en continu, les jointures flux-flux, mapGroupsWithState
et flatMapGroupsWithState
.
Les informations d’état intermédiaire requises pour les requêtes de Structured Streaming avec état peuvent conduire à des problèmes inattendus de latence et de production si elles ne sont pas correctement configurées.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le point de contrôle du journal des modifications avec RocksDB pour réduire la durée des points de contrôle et la latence de bout en bout pour les charges de travail de flux structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré. Voir Activer le point de contrôle du journal des modifications.
Optimiser des requêtes Structured Streaming avec état
La gestion des informations d’état intermédiaire des requêtes de Structured Streaming avec état peut aider à éviter des problèmes inattendus de latence et de production.
Les recommandations de Databricks sont les suivantes :
- Utilisez des instances optimisées pour le calcul en tant que workers.
- Définissez le nombre de partitions aléatoires sur 1 à 2 fois le nombre de cœurs dans le cluster.
- Définissez la configuration
spark.sql.streaming.noDataMicroBatches.enabled
surfalse
dans la SparkSession. Cela empêche le moteur de traitement par micro-lots de diffusion en continu de traiter des micro-lots qui ne contiennent pas de données. Notez également que la définition de cette configuration surfalse
peut entraîner des opérations avec état qui tirent parti des limites ou des délais de traitement pour ne pas obtenir de sortie de données tant que de nouvelles données n’arrivent, plutôt que des les obtenir immédiatement.
Databricks recommande l’utilisation de RocksDB avec les points de contrôle du journal des modifications pour gérer l’état des flux avec état. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.
Remarque
Le schéma de gestion d’état ne peut pas être modifié entre les redémarrages de la requête. Autrement dit, si une requête a été démarrée avec la gestion par défaut, il n’est possible de la modifier qu’en la démarrant à partir de zéro avec un nouvel emplacement de point de contrôle.
Utiliser plusieurs opérateurs avec état dans Structured Streaming
Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks offre une prise en charge avancée des opérateurs avec état dans les charges de travail Flux structuré. Vous pouvez désormais chaîner plusieurs opérateurs avec état, ce qui signifie que vous pouvez alimenter la sortie d’une opération telle qu’une agrégation fenêtrée vers une autre opération avec état telle qu’une jointure.
Les exemples suivants illustrent plusieurs modèles que vous pouvez utiliser.
Important
Les limitations suivantes existent lors de l’utilisation de plusieurs opérateurs avec état :
- La fonction
FlatMapGroupWithState
n'est pas prise en charge. - Seul le mode de sortie d’ajout est pris en charge.
Agrégation de fenêtres de temps chaînées
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agrégation de fenêtres de temps dans deux flux différents suivis d’une jointure de fenêtre flux-flux
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Jointure de l’intervalle de temps flux-flux suivie de l’agrégation de fenêtre de temps
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rééquilibrage d’état pour le flux structuré
Le rééquilibrage d’état est activé par défaut pour toutes les charges de travail de streaming dans les tables Delta Live. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez définir l’option de configuration suivante dans la configuration du cluster Spark pour activer le rééquilibrage de l’état :
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Le rééquilibrage d’état bénéficie aux pipelines Structured Streaming avec état qui connaissent des événements de redimensionnement de cluster. Les opérations de streaming sans état n’en bénéficient pas, même si la taille de cluster varie.
Remarque
La mise à l’échelle automatique du calcul présente des limitations pour la réduction de la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser Delta Live Tables avec mise à l’échelle automatique améliorée pour les charges de travail de diffusion en continu. Consultez Optimiser l’utilisation du cluster des pipelines Delta Live Tables avec mise à l’échelle automatique améliorée.
Un événement de redimensionnement de cluster entraîne le déclenchement d’un rééquilibrage d’état. Lors d’événements de rééquilibrage, vous pouvez noter que la latence d’un micro-lot peut être plus élevée lors du chargement de l’état du stockage cloud vers le nouvel exécuteur.
Spécifier l’état initial pour mapGroupsWithState
Vous pouvez spécifier un état initial défini par l’utilisateur pour le traitement avec état de Structured Streaming à l’aide de flatMapGroupsWithState
ou mapGroupsWithState
. Cela vous permet d’éviter de re-traiter des données au démarrage d’un flux avec état sans point de contrôle valide.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Exemple de cas d’usage spécifiant un état initial pour l’opérateur flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Exemple de cas d’usage spécifiant un état initial pour l’opérateur mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Tester la fonction de mise à jour mapGroupsWithState
LAPI TestGroupState
vous permet de tester la fonction de mise à jour d’état utilisée pour Dataset.groupByKey(...).mapGroupsWithState(...)
et Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
La fonction de mise à jour d’état prend l’état précédent comme entrée en utilisant un objet de type GroupState
. Consultez la documentation de référence sur GroupState d’Apache Spark. Par exemple :
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}