O que é streaming stateful?
Uma consulta de Streaming Estruturado com monitoração de estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.
As operações com estado incluem agregação de streaming, streaming dropDuplicates
, mapGroupsWithState
stream-stream joins e flatMapGroupsWithState
.
As informações de estado intermediário necessárias para consultas de Streaming Estruturado com monitoração de estado podem levar a problemas inesperados de latência e produção se não forem configuradas corretamente.
No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com o RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada. Consulte Ativar ponto de verificação do changelog.
Optimize consultas de Streaming estruturado com monitoração de estado
O gerenciamento das informações de estado intermediário de consultas de Streaming estruturado com monitoração de estado pode ajudar a evitar latência inesperada e problemas de produção.
A Databricks recomenda:
- Use instâncias otimizadas para computação como trabalhadores.
- Set defina o número de partições de shuffle para 1 a 2 vezes o número de núcleos no cluster.
-
Set a configuração de
spark.sql.streaming.noDataMicroBatches.enabled
parafalse
no SparkSession. Isso impede que o mecanismo de streaming de microlotes processe microlotes que não contêm dados. Observe também que definir esta configuração parafalse
pode resultar em operações com estado que utilizam marcas d'água ou tempos limite de processamento para não get saída de dados até à chegada de novos dados, em vez de imediatamente.
O Databricks recomenda o uso do RocksDB com o ponto de verificação do changelog para gerenciar o estado de fluxos com monitoração de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.
Nota
O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações de consulta. Ou seja, se uma consulta tiver sido iniciada com o gerenciamento padrão, ela não poderá ser alterada sem iniciar a consulta do zero com um novo local de ponto de verificação.
Trabalhe com vários operadores com monitoração de estado no Structured Streaming
No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com monitoração de estado em cargas de trabalho de Streaming Estruturado. Agora podes encadear vários operadores com estado, o que significa que podes alimentar a saída de uma operação, como uma agregação em janela, para outra operação com estado, como um join.
Os exemplos a seguir demonstram vários padrões que você pode usar.
Importante
As seguintes limitações existem ao trabalhar com vários operadores com monitoração de estado:
-
FlatMapGroupWithState
não é suportado. - Apenas o modo de saída append é suportado.
Encadeamento temporal window agregação
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
A agregação de window de tempo em dois fluxos diferentes, seguida de windowjoin entre fluxos
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Intervalo de tempo entre fluxos join seguido de agregação de tempo window
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rebalanceamento de estado para streaming estruturado
O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Delta Live Tables. No Databricks Runtime 11.3 LTS ou versões posteriores, você pode set a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
O reequilíbrio de estado beneficia os pipelines de Streaming estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanho do cluster.
Nota
O dimensionamento automático de computação tem limitações para reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. A Databricks recomenda o uso do Delta Live Tables com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Optimize a utilização de clusters dos pipelines Delta Live Tables com dimensionamento automático aprimorado.
Os eventos de redimensionamento de cluster fazem com que o rebalanceamento de estado seja acionado. Durante eventos de rebalanceamento, os microlotes podem ter latência maior à medida que o estado é carregado do armazenamento em nuvem para os novos executores.
Especificar o estado inicial para mapGroupsWithState
Você pode especificar um estado inicial definido pelo usuário para o processamento com estado do Streaming Estruturado usando flatMapGroupsWithState
ou mapGroupsWithState
. Isso permite que você evite o reprocessamento de dados ao iniciar um fluxo com monitoração de estado sem um ponto de verificação válido.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Exemplo de caso de uso que especifica um estado inicial para o flatMapGroupsWithState
operador:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Exemplo de caso de uso que especifica um estado inicial para o mapGroupsWithState
operador:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Testar a função mapGroupsWithState
update
A API TestGroupState
permite testar a função de estado update usada para Dataset.groupByKey(...).mapGroupsWithState(...)
e Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
A função de estado update utiliza um objeto do tipo GroupState
para usar um estado anterior como entrada. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}