Compartilhar via


Transmita registros para serviços externos com coletores DLT

Importante

A API sink DLT está em Visualização Pública .

Este artigo descreve a API DLT sink e como utilizá-la com os fluxos DLT para gravar registos transformados por um pipeline num sistema de armazenamento de dados externo, como tabelas geridas e externas do Unity Catalog, tabelas do Hive Metastore e serviços de transmissão de eventos, como Apache Kafka ou Hubs de Eventos do Azure.

O que são coletores DLT?

Os sumidouros de DLT permitem-lhe escrever dados transformados em destinos, tais como serviços de streaming de eventos, como o Apache Kafka ou os Hubs de Eventos do Azure, e tabelas externas geridas pelo Unity Catalog ou pelo metastore do Hive. Anteriormente, as tabelas de streaming e vistas materializadas criadas num pipeline de DLT podiam ser persistidas apenas em tabelas Delta geridas pelo Azure Databricks. Usando coletores, agora você tem mais opções para persistir a saída de seus pipelines DLT.

Quando devo usar os dissipadores DLT?

A Databricks recomenda o uso de coletores DLT se você precisar:

  • Crie um caso de uso operacional, como deteção de fraudes, análises em tempo real e recomendações de clientes. Os casos de uso operacionais normalmente leem dados de um barramento de mensagens, como um tópico no Apache Kafka, processam dados com baixa latência e escrevem os registos processados de volta para um barramento de mensagens. Essa abordagem permite que você obtenha latência mais baixa ao não escrever ou ler a partir do armazenamento em nuvem.
  • Escreva dados transformados de seus fluxos DLT em tabelas gerenciadas por uma instância Delta externa, incluindo tabelas gerenciadas e externas do Unity Catalog e tabelas de metastore do Hive.
  • Execute ETL (reverse extract-transform-load) em coletores externos ao Databricks, como tópicos do Apache Kafka. Essa abordagem permite que você ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas do Unity Catalog ou de outro armazenamento gerenciado pelo Databricks.

Como posso utilizar os lavatórios DLT?

Observação

  • Somente consultas de streaming usando spark.readStream e dlt.read_stream são suportadas. Não há suporte para consultas em lote.
  • Apenas append_flow pode ser usado para gravar em pias. Outros fluxos, como apply_changes, não são suportados.
  • A execução de uma atualização completa não limpa os dados de resultados computados anteriormente nos destinos de dados. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.

À medida que os dados de eventos são ingeridos de uma fonte de streaming no pipeline de DLT, você processa e refina esses dados usando a funcionalidade DLT e, em seguida, usa o processamento de fluxo de adição para transmitir os registos de dados transformados para um coletor de DLT. Você cria esse coletor usando a função create_sink(). Para obter mais detalhes sobre como usar a função create_sink, consulte a referência da API do coletor .

Para implementar um coletor de DLT, use as seguintes etapas:

  1. Configure um pipeline de DLT para processar os dados de eventos de streaming e preparar registros de dados para gravação em um coletor de DLT.
  2. Configure e crie o coletor DLT para usar o formato de coletor de destino preferido.
  3. Use um de fluxo de acréscimo de para gravar os registros preparados no coletor.

Essas etapas são abordadas no restante do tópico.

Configurar um pipeline DLT para preparar registros para gravação em um coletor

A primeira etapa é configurar um pipeline DLT para transformar os dados brutos do fluxo de eventos nos dados preparados que você gravará no coletor.

Para entender melhor este processo, pode-se seguir este exemplo de um pipeline de DLT que processa dados de eventos de fluxo de cliques a partir dos dados de exemplo wikipedia-datasets no Databricks. Este pipeline analisa o conjunto de dados brutos para identificar páginas da Wikipédia que vinculam a uma página de documentação do Apache Spark e refina progressivamente esses dados apenas para as linhas da tabela onde o link de referência contém Apache_Spark.

Neste exemplo, o pipeline DLT é estruturado utilizando a arquitetura medallion, que organiza os dados em diferentes camadas para melhorar a qualidade e a eficiência do processamento.

Para começar, carregue os registros JSON brutos do conjunto de dados em sua camada bronze usando Auto Loader. Este código Python demonstra como criar uma tabela de streaming chamada clickstream_raw, que contém os dados brutos e não processados da fonte:

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

Depois que esse código é executado, os dados agora estão no nível "bronze" (ou "dados brutos") da arquitetura Medallion e devem ser limpos. A próxima etapa refina os dados para o nível "prata", que envolve a higienização dos tipos de dados e nomes de colunas e o uso de expectativas DLT para garantir a integridade dos dados.

O código a seguir demonstra como fazer isso limpando e validando os dados da camada de bronze na tabela clickstream_clean prata:

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

Para desenvolver a camada "ouro" da estrutura do pipeline, filtre os dados de clickstream limpos para isolar entradas onde a página de referência é Apache_Spark. Neste último exemplo de código, você seleciona apenas as colunas necessárias para gravar na tabela de coletor de destino.

O código a seguir ilustra como criar uma tabela chamada spark_referrers que representa a camada ouro:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

Após a conclusão desse processo de preparação de dados, você deve configurar os coletores de destino nos quais os registros limpos serão gravados.

Configurar um coletor de DLT

O Databricks suporta três tipos de coletores de destino nos quais você grava seus registros processados a partir de seus dados de fluxo:

  • Pias de mesa delta
  • Pias Apache Kafka
  • Destinos dos Hubs de Eventos do Azure

Abaixo estão exemplos de configurações para coletores de Hubs de Eventos Delta, Kafka e Azure:

Sumidouros Delta

Para criar um depósito Delta usando o caminho do arquivo:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Para criar um coletor Delta por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

Kafka e Azure Event Hubs coletores

Esse código funciona para coletores do Apache Kafka e dos Hubs de Eventos do Azure.

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

Agora que o coletor está configurado e o pipeline DLT está preparado, você pode começar a transmitir os registros processados para o coletor.

Gravar em um coletor DLT com um fluxo de acréscimo

Com o coletor configurado, a próxima etapa é gravar registros processados nele, especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Você faz isso especificando sua pia como o valor target no decorador de append_flow.

  • Para tabelas gerenciadas e externas do Unity Catalog, use o formato delta e especifique o caminho ou o nome da tabela nas opções. Os seus pipelines de DLT devem ser configurados para usar o Unity Catalog.
  • Para tópicos do Apache Kafka, use o formato kafka e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Estas são as mesmas opções que um coletor Spark Structured Streaming Kafka suporta. Consulte Configurar o gravador de Kafka Structured Streaming.
  • Para Hubs de Eventos do Azure, use o formato kafka e especifique o nome, as informações de conexão e as informações de autenticação dos Hubs de Eventos nas opções. Essas são as mesmas opções suportadas em um coletor de Hubs de Eventos do Spark Structured Streaming que usa a interface Kafka. Consulte autenticação de principal de serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure.
  • Para tabelas de metastore do Hive, use o formato delta e especifique o caminho ou o nome da tabela nas opções. Os seus pipelines de DLT devem ser configurados para utilizar o metastore do Hive.

Abaixo estão exemplos de como configurar fluxos para gravar em coletores de Hubs de Eventos Delta, Kafka e Azure com registros processados pelo pipeline de DLT.

Sumidouro Delta

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Kafka e Azure Event Hubs coletores

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

O parâmetro value é obrigatório para um coletor de Hubs de Eventos do Azure. Parâmetros adicionais, como key, partition, headerse topic são opcionais.

Para obter mais detalhes sobre o decorador de append_flow, consulte Usar fluxo de acréscimo para gravar em uma tabela de streaming a partir de vários fluxos de origem.

Limitações

  • Apenas a API Python é suportada. SQL não é suportado.

  • Somente consultas de streaming usando spark.readStream e dlt.read_stream são suportadas. Não há suporte para consultas em lote.

  • Apenas append_flow pode ser usado para gravar em pias. Não há suporte para outros fluxos, como apply_changes, e você não pode usar um coletor em uma definição de conjunto de dados DLT. Por exemplo, o seguinte não é suportado:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Para dissipadores Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para tabelas externas gerenciadas pelo Unity Catalog, o nome da tabela deve ter o formato <catalog>.<schema>.<table>. Para o metastore do Hive, ele deve estar na forma <schema>.<table>.

  • A execução FullRefresh não limpará os dados de resultados previamente computados nos coletores. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.

  • As expectativas em relação à tecnologia DLT não são atendidas.

Recursos