Aplicar marcas d'água para controlar limites de processamento de dados
Este artigo apresenta os conceitos básicos de aplicação de marca d'água e fornece recomendações para usar marcas d'água em operações comuns de streaming com estado. Você deve aplicar marcas d'água a operações de streaming com estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que poderia apresentar problemas de memória e aumentar as latências de processamento durante operações de streaming de execução prolongada.
O que é uma marca d'água?
O fluxo estruturado usa marcas d'água para controlar o limite por quanto tempo deve continuar processando atualizações para uma determinada entidade de estado. Exemplos comuns de entidades de estado incluem:
- Agregações em uma janela de tempo.
- Chaves exclusivas em uma junção entre dois fluxos.
Ao declarar uma marca d'água, você especifica um campo de carimbo de data/hora e um limite de marca d'água em um DataFrame de streaming. À medida que novos dados chegam, o gerenciador de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registros dentro do limite de latência.
O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janela:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Neste exemplo:
- A coluna
event_time
é usada para definir uma marca d'água de 10 minutos e uma janela em cascata de 5 minutos. - Uma contagem é coletada para cada
id
observada para cada janela de 5 minutos não sobrepostas. - As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que a
event_time
mais recente observada.
Importante
Os limites de marca d'água garantem que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da consulta definida. Registros de chegada tardia que chegam fora do limite especificado ainda podem ser processados usando métricas de consulta, mas isso não é garantido.
Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?
As marcas d'água interagem com os modos de saída a serem controlados quando dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso efetivo de marcas d'água é essencial para uma taxa de transferência eficiente de streaming com estado.
Observação
Nem todos os modos de saída têm suporte para todas as operações com estado.
Marcas d'água e modo de saída para agregações em janelas
A tabela a seguir detalha o processamento de consultas com agregação em um carimbo de data/hora com uma marca d'água definida:
Modo de saída | Comportamento |
---|---|
Acrescentar | As linhas são gravadas na tabela de destino após o limite da marca d'água ser passado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado depois que o limite é passado. |
Atualizar | As linhas são gravadas na tabela de destino conforme os resultados são calculados e podem ser atualizadas e substituídas à medida que novos dados chegam. O estado de agregação antigo é descartado depois que o limite é passado. |
Concluído | O estado de agregação não é descartado. A tabela de destino é reescrita com cada gatilho. |
Marcas d'água e saída para junções de fluxo-fluxo
As junções entre vários fluxos só dão suporte ao modo de acréscimo e os registros correspondentes são gravados em cada lote descoberto. Para junções internas, o Databricks recomenda definir um limite de marca d'água em cada fonte de dados de streaming. Isso permite que as informações de estado sejam descartadas para registros antigos. Sem marcas d'água, o Streaming Estruturado tenta unir todas as chaves de ambos os lados da junção com cada gatilho.
O Streaming Estruturado tem semântica especial para dar suporte a junções externas. A marca d'água é obrigatória para junções externas, pois indica quando uma chave deve ser gravada com um valor nulo após ficar sem correspondência. Observe que, embora as junções externas possam ser úteis para gravar registros que nunca são correspondidos durante o processamento de dados, pois as junções gravam apenas em tabelas como operações de acréscimo, esses dados ausentes não são registrados até que o limite de atraso tenha passado.
Controlar o limite de dados atrasados com várias políticas de marca-d'água em Streaming Estruturado
Ao trabalhar com várias entradas de Streaming Estruturado, você pode definir várias marcas-d'água para controlar os limites de tolerância para dados de chegada tardia. Configurar marcas-d'água permite controlar informações de estado e afeta a latência.
Uma consulta de streaming pode ter vários fluxos de entrada que são unidos. Cada um dos fluxos de entrada pode ter um limite diferente de dados tardias que precisam ser tolerados para operações com estado. Especifique esses limites usando withWatermarks("eventTime", delay)
em cada um dos fluxos de entrada. Veja a seguir uma consulta de exemplo com junções de fluxo a fluxo.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante a execução da consulta, o Streaming Estruturado rastreia individualmente o tempo máximo de evento visto em cada fluxo de entrada, calcula marcas-d'água com base no atraso correspondente e escolhe uma única marca-d'água global com elas para ser usada para operações com estado. Por padrão, o mínimo é escolhido como a marca-d'água global porque garante que nenhum dado seja acidentalmente descartado como tarde demais se um dos fluxos ficar atrás dos outros (por exemplo, um dos fluxos para de receber dados devido a falhas upstream). Em outras palavras, a marca-d'água global se move com segurança no ritmo do fluxo mais lento e a saída da consulta é atrasada de acordo.
Se quiser resultados mais rápidos, você pode definir a política de marca-d'água múltipla para escolher o valor máximo como a marca-d'água global definindo a configuração de SQL spark.sql.streaming.multipleWatermarkPolicy
como max
(o padrão é min
). Isso permite que a marca d'água global se mova no ritmo do fluxo mais rápido. No entanto, essa configuração remove dados dos fluxos mais lentos. Portanto, o Databricks recomenda que você use essa configuração com cuidado.
Descartar duplicatas dentro da marca d'água
No Databricks Runtime 13.3 LTS e versões superiores, você pode eliminar registros duplicados dentro de um limite de marca d'água usando um identificador exclusivo.
O Fluxo Estruturado fornece garantias de processamento exatamente uma vez, mas não elimina automaticamente a duplicação de registros de fontes de dados. Você pode usar dropDuplicatesWithinWatermark
para eliminar a duplicação de registros em qualquer campo especificado, permitindo que você remova duplicatas de um fluxo mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada).
Os registros duplicados que chegam na marca d'água especificada têm a garantia de serem descartados. Essa garantia é estrita em apenas uma direção e os registros duplicados que chegam foram do limite especificado também podem ser descartados. Você deve definir o limite de atraso da marca d'água por mais tempo do que as diferenças máximas de carimbo de data/hora entre eventos duplicados para remover todas as duplicatas.
Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark
, como no exemplo a seguir:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])