Otimize o processamento com estado em DLT com marcas d'água
Para gerir efetivamente os dados armazenados em estado, utilize marcas d'água ao executar o processamento de fluxo com estado em DLT, incluindo agregações, junção e desduplicação. Este artigo descreve como usar marcas d'água nas suas consultas DLT e inclui exemplos de operações recomendadas.
Observação
Para garantir que as consultas que executam agregações sejam processadas incrementalmente e não totalmente recalculadas a cada atualização, você deve usar marcas d'água.
O que é uma marca d'água?
No processamento de fluxo, uma marca d'água é uma funcionalidade do Apache Spark que pode definir um limite baseado em tempo para processar dados ao realizar operações com estado, como agregações. Os dados que chegam são processados até que o limite seja atingido, momento em que a janela de tempo definida pelo limite é fechada. As marcas d'água podem ser usadas para evitar problemas durante o processamento de consultas, principalmente ao processar conjuntos de dados maiores ou processamento de longa duração. Esses problemas podem incluir alta latência na produção de resultados e até mesmo erros de falta de memória (OOM) devido à quantidade de dados mantidos no estado durante o processamento. Como os dados de streaming são inerentemente não ordenados, os marcadores temporais também suportam operações de cálculo corretas, como agregações em janelas de tempo.
Para saber mais sobre como usar marcas d'água no processamento de fluxo, consulte Marcas d'água no Apache Spark Structured Streaming e Aplicar marcas d'água para controlar limites de processamento de dados.
Como se define uma marca d'água?
Você define uma marca d'água especificando um campo de carimbo de data/hora e um valor que representa o limite de tempo para de dados atrasados chegarem. Os dados são considerados tardios se chegarem após o limite de tempo definido. Por exemplo, se o limite for definido como 10 minutos, os registros que chegarem após o limite de 10 minutos poderão ser descartados.
Como os registros que chegam após o limite definido podem ser descartados, selecionar um limite que atenda aos seus requisitos de latência versus correção é importante. A escolha de um limiar mais pequeno resulta na emissão de registos mais cedo, mas também significa que os registos tardios têm maior probabilidade de serem descartados. Um limiar mais elevado significa uma espera mais longa, mas possivelmente uma maior exaustividade dos dados. Devido ao tamanho maior do estado, um limite maior também pode exigir recursos de computação adicionais. Como o valor do limite depende dos dados e dos requisitos de processamento, testar e monitorar o processamento é importante para determinar um limite ideal.
Você usa a função withWatermark()
em Python para definir uma marca d'água. Em SQL, utilize a cláusula WATERMARK
para definir uma marca d'água:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Use marcas d'água com junções de córrego
Para uniões stream-stream, deve-se definir uma marca temporal em ambos os lados da junção e uma cláusula de intervalo de tempo. Como cada fonte de junção tem uma visualização incompleta dos dados, a cláusula de intervalo de tempo é necessária para informar ao mecanismo de streaming quando nenhuma outra correspondência pode ser feita. A cláusula de intervalo de tempo deve usar os mesmos campos usados para definir as marcas d'água.
Como pode haver momentos em que cada fluxo requer limites diferentes para marcas d'água, os córregos não precisam ter os mesmos limites. Para evitar a perda de dados, o mecanismo de streaming mantém uma marca temporal global com base no fluxo mais lento.
O exemplo a seguir une um fluxo de impressões de anúncios e um fluxo de cliques do usuário em anúncios. Neste exemplo, um clique deve ocorrer dentro de 3 minutos após a impressão. Depois que o período de 3 minutos decorre, as linhas do estado atual que já não podem ser emparelhadas são descartadas.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Executar agregações em janela com marcas d'água
Uma operação com estado comum em streaming de dados é uma agregação em janela. As agregações em janela são semelhantes às agregações agrupadas, exceto que os valores agregados são retornados para o conjunto de linhas que fazem parte da janela definida.
Uma janela pode ser definida como um determinado comprimento, e uma operação de agregação pode ser executada em todas as linhas que fazem parte dessa janela. O Spark Streaming suporta três tipos de janelas:
- Janelas de tombamento (fixas): Uma série de intervalos de tempo de tamanho fixo, não sobrepostos e contíguos. Um registro de entrada pertence a apenas uma única janela.
- Janelas de correr: Semelhante às janelas tombadas, as janelas de correr são de tamanho fixo, mas as janelas podem se sobrepor e um registro pode cair em várias janelas.
Quando os dados chegam depois do final da janela mais o comprimento da marca d'água, nenhum novo dado é aceito para a janela, o resultado da agregação é emitido e o estado da janela é descartado.
O exemplo a seguir calcula uma soma de impressões a cada 5 minutos usando uma janela fixa. Neste exemplo, a cláusula select usa o alias impressions_window
e, em seguida, a própria janela é definida como parte da cláusula GROUP BY
. A janela deve ser baseada na mesma coluna de timestamp que a marca d'água, a coluna clickTimestamp
neste exemplo.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Um exemplo semelhante em Python para determinar o lucro em intervalos horários fixos.
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Eliminar a duplicação de registos de transmissão
O Streaming Estruturado garante processamento exato uma só vez, mas não elimina automaticamente a duplicação de registos de fontes de dados. Por exemplo, porque muitas filas de mensagens têm garantias de pelo menos uma entrega, registos duplicados devem ser esperados ao ler de uma dessas filas de mensagens. Você pode usar a função dropDuplicatesWithinWatermark()
para eliminar a duplicação de registros em qualquer campo especificado, removendo duplicatas de um fluxo, mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada). Você deve especificar uma marca d'água para usar a função dropDuplicatesWithinWatermark()
. Todos os dados duplicados que chegam dentro do intervalo de tempo especificado pela marca d'água são descartados.
Os dados ordenados são importantes porque os dados fora de ordem fazem com que o valor de referência avance incorretamente. Então, quando os dados mais antigos chegam, eles são considerados tardios e descartados. Use a opção withEventTimeOrder
para processar o instantâneo inicial por ordem com base no carimbo de data/hora especificado na marca d'água. A opção withEventTimeOrder
pode ser declarada no código que define o conjunto de dados ou nas configurações de pipeline usando spark.databricks.delta.withEventTimeOrder.enabled
. Por exemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Observação
A opção withEventTimeOrder
é suportada apenas com Python.
No exemplo a seguir, os dados são processados ordenados por clickTimestamp
e os registros que chegam dentro de 5 segundos uns dos outros que contêm colunas userId
e clickAdId
duplicadas são descartados.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Otimize a configuração do pipeline para processamento com monitoração de estado
Para ajudar a evitar problemas de produção e latência excessiva, a Databricks recomenda habilitar a gestão de estado baseada em RocksDB para o seu processamento de fluxo com estado, especialmente se o processamento requerer salvar uma grande quantidade de estado intermediário.
Pipelines sem servidor gerem automaticamente as configurações de armazenamento de estado.
Você pode habilitar o gerenciamento de estado baseado em RocksDB definindo a seguinte configuração antes de implantar um pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para saber mais sobre o armazenamento de estado RocksDB, incluindo recomendações de configuração para RocksDB, consulte Configurar armazenamento de estado RocksDB no Azure Databricks.