Partilhar via


O que é streaming stateful?

Uma consulta de Streaming Estruturado com monitoração de estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.

As operações com estado incluem agregação de streaming, streaming dropDuplicates, mapGroupsWithStatestream-stream joins e flatMapGroupsWithState.

As informações de estado intermediário necessárias para consultas de Streaming Estruturado com monitoração de estado podem levar a problemas inesperados de latência e produção se não forem configuradas corretamente.

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com o RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada. Consulte Ativar ponto de verificação do changelog.

Otimize consultas de Streaming estruturado com monitoração de estado

O gerenciamento das informações de estado intermediário de consultas de Streaming estruturado com monitoração de estado pode ajudar a evitar latência inesperada e problemas de produção.

A Databricks recomenda:

  • Use instâncias otimizadas para computação como trabalhadores.
  • Defina o número de partições aleatórias para 1-2 vezes o número de núcleos no cluster.
  • Defina a spark.sql.streaming.noDataMicroBatches.enabled configuração como false no SparkSession. Isso impede que o mecanismo de streaming de microlotes processe microlotes que não contêm dados. Observe também que definir essa configuração como false pode resultar em operações com monitoração de estado que aproveitam marcas d'água ou tempos limite de processamento para não obter saída de dados até que novos dados cheguem, em vez de imediatamente.

O Databricks recomenda o uso do RocksDB com o ponto de verificação do changelog para gerenciar o estado de fluxos com monitoração de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Nota

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações de consulta. Ou seja, se uma consulta tiver sido iniciada com o gerenciamento padrão, ela não poderá ser alterada sem iniciar a consulta do zero com um novo local de ponto de verificação.

Trabalhe com vários operadores com monitoração de estado no Structured Streaming

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com monitoração de estado em cargas de trabalho de Streaming Estruturado. Agora você pode encadear vários operadores com monitoração de estado, o que significa que você pode alimentar a saída de uma operação, como uma agregação em janela, para outra operação com monitoração de estado, como uma junção.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com vários operadores com monitoração de estado:

  • FlatMapGroupWithState não é suportado.
  • Apenas o modo de saída append é suportado.

Agregação de janela de tempo encadeada

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação de janela de tempo em dois fluxos diferentes seguida de junção de janela de fluxo de fluxo

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção de intervalo de tempo de fluxo de fluxo seguida de agregação de janela de tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para streaming estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Delta Live Tables. No Databricks Runtime 11.3 LTS e superior, você pode definir a seguinte opção de configuração na configuração do cluster do Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O reequilíbrio de estado beneficia os pipelines de Streaming estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanho do cluster.

Nota

O dimensionamento automático de computação tem limitações para reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. O Databricks recomenda a utilização do Delta Live Tables com dimensionamento automático melhorado para cargas de trabalho de transmissão em fluxo. Consulte Otimizar a utilização de cluster de pipelines Delta Live Tables com dimensionamento automático aprimorado.

Os eventos de redimensionamento de cluster fazem com que o rebalanceamento de estado seja acionado. Durante eventos de rebalanceamento, os microlotes podem ter latência maior à medida que o estado é carregado do armazenamento em nuvem para os novos executores.

Especificar o estado inicial para mapGroupsWithState

Você pode especificar um estado inicial definido pelo usuário para o processamento com estado do Streaming Estruturado usando flatMapGroupsWithStateou mapGroupsWithState. Isso permite que você evite o reprocessamento de dados ao iniciar um fluxo com monitoração de estado sem um ponto de verificação válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemplo de caso de uso que especifica um estado inicial para o flatMapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemplo de caso de uso que especifica um estado inicial para o mapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testar a mapGroupsWithState função de atualização

A TestGroupState API permite testar a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}