Partilhar via


Otimize o processamento stateful em Delta Live Tables com marcas d'água

Para gerenciar efetivamente os dados mantidos no estado, use marcas d'água ao executar o processamento de fluxo com estado em Delta Live Tables, incluindo agregações, junções e desduplicação. Este artigo descreve como usar marcas d'água em suas consultas Delta Live Tables e inclui exemplos das operações recomendadas.

Nota

Para garantir que as consultas que executam agregações sejam processadas incrementalmente e não totalmente recalculadas a cada atualização, você deve usar marcas d'água.

O que é uma marca d'água?

No processamento de fluxo, uma marca d'água é um recurso do Apache Spark que pode definir um limite baseado em tempo para processar dados ao executar operações com monitoração de estado, como agregações. Os dados que chegam são processados até que o limite seja atingido, momento em que a janela de tempo definida pelo limite é fechada. As marcas d'água podem ser usadas para evitar problemas durante o processamento de consultas, principalmente ao processar conjuntos de dados maiores ou processamento de longa duração. Esses problemas podem incluir alta latência na produção de resultados e até mesmo erros de falta de memória (OOM) devido à quantidade de dados mantidos no estado durante o processamento. Como os dados de streaming são inerentemente não ordenados, as marcas d'água também suportam operações de cálculo corretas, como agregações de janela de tempo.

Para saber mais sobre como usar marcas d'água no processamento de fluxo, consulte Marca d'água no Apache Spark Structured Streaming e Aplicar marcas d'água para controlar limites de processamento de dados.

Como se define uma marca d'água?

Você define uma marca d'água especificando um campo de carimbo de data/hora e um valor que representa o limite de tempo para a chegada de dados atrasados. Os dados são considerados tardios se chegarem após o limite de tempo definido. Por exemplo, se o limite for definido como 10 minutos, os registros que chegarem após o limite de 10 minutos poderão ser descartados.

Como os registros que chegam após o limite definido podem ser descartados, selecionar um limite que atenda aos seus requisitos de latência versus correção é importante. A escolha de um limiar mais pequeno resulta na emissão de registos mais cedo, mas também significa que os registos tardios têm maior probabilidade de serem descartados. Um limiar mais elevado significa uma espera mais longa, mas possivelmente uma maior exaustividade dos dados. Devido ao tamanho maior do estado, um limite maior também pode exigir recursos de computação adicionais. Como o valor do limite depende dos dados e dos requisitos de processamento, testar e monitorar o processamento é importante para determinar um limite ideal.

Você usa a withWatermark() função em Python para definir uma marca d'água. Em SQL, use a WATERMARK cláusula para definir uma marca d'água:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Usar marcas d'água com junções de fluxo

Para junções stream-stream, você deve definir uma marca d'água em ambos os lados da junção e uma cláusula de intervalo de tempo. Como cada fonte de junção tem uma visualização incompleta dos dados, a cláusula de intervalo de tempo é necessária para informar ao mecanismo de streaming quando nenhuma outra correspondência pode ser feita. A cláusula de intervalo de tempo deve usar os mesmos campos usados para definir as marcas d'água.

Como pode haver momentos em que cada fluxo requer limites diferentes para marcas d'água, os córregos não precisam ter os mesmos limites. Para evitar a falta de dados, o mecanismo de streaming mantém uma marca d'água global com base no fluxo mais lento.

O exemplo a seguir une um fluxo de impressões de anúncios e um fluxo de cliques do usuário em anúncios. Neste exemplo, um clique deve ocorrer dentro de 3 minutos após a impressão. Depois que o intervalo de tempo de 3 minutos passa, as linhas do estado que não podem mais ser correspondidas são descartadas.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Executar agregações em janela com marcas d'água

Uma operação stateful comum em streaming de dados é uma agregação em janela. As agregações em janela são semelhantes às agregações agrupadas, exceto que os valores agregados são retornados para o conjunto de linhas que fazem parte da janela definida.

Uma janela pode ser definida como um determinado comprimento, e uma operação de agregação pode ser executada em todas as linhas que fazem parte dessa janela. O Spark Streaming suporta três tipos de janelas:

  • Janelas de tombamento (fixas): uma série de intervalos de tempo de tamanho fixo, não sobrepostos e contíguos. Um registro de entrada pertence a apenas uma única janela.
  • Janelas de correr: Semelhante às janelas tombadas, as janelas de correr são de tamanho fixo, mas as janelas podem se sobrepor e um registro pode cair em várias janelas.

Quando os dados passam do final da janela mais o comprimento da marca d'água, nenhum novo dado é aceito para a janela, o resultado da agregação é emitido e o estado da janela é descartado.

O exemplo a seguir calcula uma soma de impressões a cada 5 minutos usando uma janela fixa. Neste exemplo, a cláusula select usa o alias impressions_windowe, em seguida, a própria janela é definida como parte da GROUP BY cláusula. A janela deve ser baseada na mesma coluna de carimbo de data/hora que a marca d'água, a clickTimestamp coluna neste exemplo.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Um exemplo semelhante em Python para calcular o lucro em janelas fixas por hora:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Eliminar a duplicação de registos de transmissão

O Streaming Estruturado tem garantias de processamento exatamente uma vez, mas não elimina automaticamente a duplicação de registros de fontes de dados. Por exemplo, como muitas filas de mensagens têm pelo menos uma vez garantias, registros duplicados devem ser esperados ao ler de uma dessas filas de mensagens. Você pode usar a função para eliminar a dropDuplicatesWithinWatermark() duplicação de registros em qualquer campo especificado, removendo duplicatas de um fluxo, mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada). Você deve especificar uma marca d'água para usar a dropDuplicatesWithinWatermark() função. Todos os dados duplicados que chegam dentro do intervalo de tempo especificado pela marca d'água são descartados.

Os dados ordenados são importantes porque os dados fora de ordem fazem com que o valor da marca d'água avance incorretamente. Então, quando os dados mais antigos chegam, eles são considerados tardios e descartados. Use a withEventTimeOrder opção para processar o instantâneo inicial em ordem com base no carimbo de data/hora especificado na marca d'água. A withEventTimeOrder opção pode ser declarada no código que define o conjunto de dados ou nas configurações de pipeline usando spark.databricks.delta.withEventTimeOrder.enabled. Por exemplo:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Nota

A withEventTimeOrder opção é suportada apenas com Python.

No exemplo a seguir, os dados são processados ordenados pela clickTimestamp, e os registros que chegam dentro de 5 segundos uns dos outros que contêm duplicados userId e clickAdId colunas são descartados.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Otimize a configuração do pipeline para processamento com monitoração de estado

Para ajudar a evitar problemas de produção e latência excessiva, o Databricks recomenda habilitar o gerenciamento de estado baseado em RocksDB para seu processamento de fluxo com monitoração de estado, especialmente se o processamento exigir salvar uma grande quantidade de estado intermediário.

Pipelines sem falhas gerenciam automaticamente as configurações de armazenamento de estado.

Você pode habilitar o gerenciamento de estado baseado em RocksDB definindo a seguinte configuração antes de implantar um pipeline:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Para saber mais sobre o armazenamento de estado RocksDB, incluindo recomendações de configuração para RocksDB, consulte Configurar armazenamento de estado RocksDB no Azure Databricks.