Partilhar via


O que é streaming stateful?

Uma consulta stateful de Streaming Estruturado requer atualizações incrementais para a informação de estado intermediária, enquanto uma consulta stateless de Streaming Estruturado apenas controla informações sobre quais linhas foram processadas da origem para o coletor.

As operações com estado incluem agregação de streaming, dropDuplicatesde streaming, junções de fluxo a fluxo e aplicações com estado personalizadas.

As informações de estado intermediário necessárias para consultas com estado em streaming estruturado podem causar problemas inesperados de latência e produção se configuradas incorretamente.

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com o RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada. Consulte Ativar ponto de verificação do registo de alterações.

Otimize consultas de streaming estruturado com estado

O gerenciamento das informações de estado intermediário de consultas de Streaming estruturado com monitoração de estado pode ajudar a evitar latência inesperada e problemas de produção.

A Databricks recomenda:

  • Utilize instâncias otimizadas para computação como processos de trabalho.
  • Defina o número de partições aleatórias para 1-2 vezes o número de núcleos no cluster.
  • Defina a configuração spark.sql.streaming.noDataMicroBatches.enabled como false no SparkSession. Isso impede que o mecanismo de streaming de microlotes processe microlotes que não contêm dados. Observe também que definir essa configuração como false pode resultar em operações com monitoração de estado que usam marcas d'água ou tempos limite de processamento para não obter saída de dados até que novos dados cheguem em vez de imediatamente.

O Databricks recomenda o uso do RocksDB com o ponto de verificação do changelog para gerenciar o estado de fluxos com monitoração de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Nota

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações de consulta. Se uma consulta tiver sido iniciada com o gerenciamento padrão, você deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o armazenamento de estado.

Trabalhe com múltiplos operadores com estado no Structured Streaming

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com monitoração de estado em cargas de trabalho de Streaming Estruturado. Agora, pode encadear vários operadores com estado, ou seja, pode encaminhar os resultados de uma operação, como uma agregação em janela, para outra operação com estado, como uma junção.

No Databricks Runtime 16.2 e posteriores, você pode usar transformWithState em cargas de trabalho com vários operadores com estado. Consulte Criar uma aplicação com estado personalizado.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com múltiplos operadores de estado:

  • Não são suportados os operadores personalizados com estado herdados (FlatMapGroupWithState e applyInPandasWithState.
  • Apenas o modo de saída "acrescentar" é suportado.

Agregação de janela de tempo encadeada

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação por janelas temporais em dois fluxos diferentes seguida de junção de janelas entre fluxos

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção de intervalos de tempo entre fluxos seguida de agregação com janela de tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para streaming estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming na DLT. No Databricks Runtime 11.3 LTS e superior, você pode definir a seguinte opção de configuração na configuração do cluster do Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O reequilíbrio de estado beneficia os pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de clusters. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanho do cluster.

Nota

O dimensionamento automático de computação tem limitações para reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. O Databricks recomenda o uso de DLT com dimensionamento automático aprimorado para cargas de trabalho de streaming. Veja Otimizar a utilização do cluster das DLT pipelines com dimensionamento automático melhorado.

Os eventos de redimensionamento de cluster acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante eventos de rebalanceamento à medida que o estado é carregado do armazenamento em nuvem para os novos executores.