Partilhar via


O que é streaming stateful?

Uma consulta de Streaming Estruturado com monitoração de estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.

As operações com estado incluem agregação de streaming, streaming dropDuplicates, mapGroupsWithStatestream-stream joins e flatMapGroupsWithState.

As informações de estado intermediário necessárias para consultas de Streaming Estruturado com monitoração de estado podem levar a problemas inesperados de latência e produção se não forem configuradas corretamente.

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com o RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada. Consulte Ativar ponto de verificação do changelog.

Optimize consultas de Streaming estruturado com monitoração de estado

O gerenciamento das informações de estado intermediário de consultas de Streaming estruturado com monitoração de estado pode ajudar a evitar latência inesperada e problemas de produção.

A Databricks recomenda:

  • Use instâncias otimizadas para computação como trabalhadores.
  • Set defina o número de partições de shuffle para 1 a 2 vezes o número de núcleos no cluster.
  • Set a configuração de spark.sql.streaming.noDataMicroBatches.enabled para false no SparkSession. Isso impede que o mecanismo de streaming de microlotes processe microlotes que não contêm dados. Observe também que definir esta configuração para false pode resultar em operações com estado que utilizam marcas d'água ou tempos limite de processamento para não get saída de dados até à chegada de novos dados, em vez de imediatamente.

O Databricks recomenda o uso do RocksDB com o ponto de verificação do changelog para gerenciar o estado de fluxos com monitoração de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Nota

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações de consulta. Ou seja, se uma consulta tiver sido iniciada com o gerenciamento padrão, ela não poderá ser alterada sem iniciar a consulta do zero com um novo local de ponto de verificação.

Trabalhe com vários operadores com monitoração de estado no Structured Streaming

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com monitoração de estado em cargas de trabalho de Streaming Estruturado. Agora podes encadear vários operadores com estado, o que significa que podes alimentar a saída de uma operação, como uma agregação em janela, para outra operação com estado, como um join.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com vários operadores com monitoração de estado:

  • FlatMapGroupWithState não é suportado.
  • Apenas o modo de saída append é suportado.

Encadeamento temporal window agregação

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

A agregação de window de tempo em dois fluxos diferentes, seguida de windowjoin entre fluxos

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Intervalo de tempo entre fluxos join seguido de agregação de tempo window

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para streaming estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Delta Live Tables. No Databricks Runtime 11.3 LTS ou versões posteriores, você pode set a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O reequilíbrio de estado beneficia os pipelines de Streaming estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanho do cluster.

Nota

O dimensionamento automático de computação tem limitações para reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. A Databricks recomenda o uso do Delta Live Tables com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Optimize a utilização de clusters dos pipelines Delta Live Tables com dimensionamento automático aprimorado.

Os eventos de redimensionamento de cluster fazem com que o rebalanceamento de estado seja acionado. Durante eventos de rebalanceamento, os microlotes podem ter latência maior à medida que o estado é carregado do armazenamento em nuvem para os novos executores.

Especificar o estado inicial para mapGroupsWithState

Você pode especificar um estado inicial definido pelo usuário para o processamento com estado do Streaming Estruturado usando flatMapGroupsWithStateou mapGroupsWithState. Isso permite que você evite o reprocessamento de dados ao iniciar um fluxo com monitoração de estado sem um ponto de verificação válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemplo de caso de uso que especifica um estado inicial para o flatMapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemplo de caso de uso que especifica um estado inicial para o mapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testar a função mapGroupsWithStateupdate

A API TestGroupState permite testar a função de estado update usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de estado update utiliza um objeto do tipo GroupStatepara usar um estado anterior como entrada. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}