Transmita registros para serviços externos com os coletores Delta Live Tables
Este artigo descreve a API do sink
Delta Live Tables e como usá-la com fluxos de DLT para escrever registos transformados por um pipeline num coletor de dados externo, como tabelas geridas e externas do Unity Catalog, tabelas de metastore do Hive e serviços de streaming de eventos, como Apache Kafka ou Hubs de Eventos do Azure.
O que são pias Delta Live Tables?
Os recetores Delta Live Tables permitem-lhe escrever dados transformados para destinos, como serviços de streaming de eventos, como o Apache Kafka ou os Hubs de Eventos do Azure, e tabelas externas geridas pelo Unity Catalog ou pelo metastore do Hive. Anteriormente, as tabelas de streaming e visões materializadas criadas num pipeline do Delta Live Tables podiam ser persistidas apenas em tabelas Delta geridas pelo Azure Databricks. Usando coletores, agora você tem mais opções para persistir a saída de seus pipelines Delta Live Tables.
Quando devo usar os lavatórios Delta Live Tables?
A Databricks recomenda o uso de dissipadores Delta Live Tables se você precisar:
- Crie um caso de uso operacional, como deteção de fraudes, análises em tempo real e recomendações de clientes. Os casos de uso operacionais normalmente leem dados a partir de um barramento de mensagens, como um tópico do Apache Kafka, processam os dados com baixa latência e escrevem os registos processados de volta para um barramento de mensagens. Essa abordagem permite que você obtenha latência mais baixa ao não escrever ou ler a partir do armazenamento em nuvem.
- Escreva dados transformados de seus fluxos do Delta Live Tables em tabelas gerenciadas por uma instância Delta externa, incluindo tabelas gerenciadas e externas do Unity Catalog e tabelas de metastore do Hive.
- Execute ETL (reverse extract-transform-load) em coletores externos ao Databricks, como tópicos do Apache Kafka. Essa abordagem permite que você ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas do Unity Catalog ou de outro armazenamento gerenciado pelo Databricks.
Como faço para usar as pias Delta Live Tables?
Observação
- Somente consultas de streaming usando
spark.readStream
edlt.read_stream
são suportadas. Não há suporte para consultas em lote. - Apenas
append_flow
pode ser usado para gravar em pias. Outros fluxos, comoapply_changes
, não são suportados. - A execução de uma atualização completa não limpa os dados de resultados computados anteriormente nos coletores. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
À medida que os dados de eventos são ingeridos de uma fonte de streaming no seu pipeline Delta Live Tables, você processa e refina esses dados usando a funcionalidade do Delta Live Tables e, em seguida, usa o processamento de fluxo de acréscimo para transmitir os registos de dados transformados para um destino do Delta Live Tables. Você cria esse coletor usando a função create_sink()
. Para obter mais detalhes sobre como usar a função create_sink
, consulte a referência da API do .
Para implementar um coletor Delta Live Tables, use as seguintes etapas:
- Configure um pipeline do Delta Live Tables para processar os dados de eventos de streaming e preparar registros de dados para gravação em um coletor do Delta Live Tables.
- Configure e crie o coletor Delta Live Tables para usar o formato de coletor de destino preferido.
- Use um de fluxo de acréscimo de para gravar os registros preparados no coletor.
Essas etapas são abordadas no restante do tópico.
Configurar um pipeline Delta Live Tables para preparar registros para gravação em um coletor
A primeira etapa é configurar um pipeline Delta Live Tables para transformar os dados brutos do fluxo de eventos nos dados preparados que você gravará no coletor.
Para entender melhor esse processo, você pode seguir este exemplo de um pipeline Delta Live Tables que processa dados de eventos de clickstream dos dados de exemplo wikipedia-datasets
no Databricks. Este pipeline analisa o conjunto de dados brutos para identificar páginas da Wikipédia que vinculam a uma página de documentação do Apache Spark e refina progressivamente esses dados apenas para as linhas da tabela onde o link de referência contém Apache_Spark.
Neste exemplo, o pipeline Delta Live Tables é estruturado usando a arquitetura de medalhão , que organiza os dados em diferentes camadas para melhorar a qualidade e a eficiência do processamento.
Para começar, carregue os registros JSON brutos do conjunto de dados em sua camada bronze usando Auto Loader. Este código Python demonstra como criar uma tabela de streaming chamada clickstream_raw
, que contém os dados brutos e não processados da fonte:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Depois que esse código é executado, os dados agora estão no nível "bronze" (ou "dados brutos") da arquitetura Medallion e devem ser limpos. A próxima etapa refina os dados para o nível "prata", que envolve a limpeza de tipos de dados e nomes de colunas e o uso das expectativas do Delta Live Tables para garantir a integridade dos dados.
O código a seguir demonstra como fazer isso limpando e validando os dados da camada de bronze na tabela clickstream_clean
prata:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Para desenvolver a camada "ouro" da estrutura do pipeline, filtre os dados de clickstream limpos, isolando as entradas onde a página de referência é Apache_Spark
. Neste último exemplo de código, você seleciona apenas as colunas necessárias para gravar na tabela de coletor de destino.
O código a seguir ilustra como criar uma tabela chamada spark_referrers
que representa a camada ouro:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Após a conclusão deste processo de preparação de dados, o utilizador deve configurar os coletores de destino nos quais os registos limpos serão escritos.
Configurar um coletor Delta Live Tables
O Databricks suporta três tipos de coletores de destino nos quais você grava seus registros processados a partir de seus dados de fluxo:
- Pias de mesa delta
- Pias Apache Kafka
- Coletores de Hubs de Eventos do Azure
Abaixo estão exemplos de configurações para coletores de Hubs de Eventos Delta, Kafka e Azure:
Sumidouros Delta
Para criar um "Delta sink" por caminho de arquivo:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um coletor Delta por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka e Azure Event Hubs sinks
Esse código funciona para coletores do Apache Kafka e dos Hubs de Eventos do Azure.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Agora que o coletor está configurado e o pipeline Delta Live Tables está preparado, você pode começar a transmitir os registros processados para o coletor.
Gravar em um coletor Delta Live Tables com um fluxo de acréscimo
Com o coletor configurado, a próxima etapa é gravar registros processados nele, especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Você faz isso especificando sua pia como o valor target
no decorador de append_flow
.
- Para tabelas gerenciadas e externas do Unity Catalog, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Os seus pipelines Delta Live Tables devem ser configurados para usar o Unity Catalog. - Para tópicos do Apache Kafka, use o formato
kafka
e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Estas são as mesmas opções que um coletor Spark Structured Streaming Kafka suporta. Consulte Configurar o gravador de Kafka Structured Streaming. - Para Hubs de Eventos do Azure, use o formato
kafka
e especifique o nome, as informações de conexão e as informações de autenticação dos Hubs de Eventos nas opções. Essas são as mesmas opções suportadas em um coletor de Hubs de Eventos do Spark Structured Streaming que usa a interface Kafka. Consulte a autenticação do "Service Principal" com o Microsoft Entra ID e os Hubs de Eventos do Azure . - Para tabelas de metastore do Hive, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Os seus pipelines do Delta Live Tables devem estar configurados para utilizar o metastore do Hive.
Abaixo estão exemplos de como configurar fluxos para gravar em coletores de Hubs de Eventos Delta, Kafka e Azure com registros processados pelo pipeline Delta Live Tables.
Sumidouro Delta
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka e Azure Event Hubs coletores
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value
é obrigatório para um coletor de Hubs de Eventos do Azure. Parâmetros adicionais, como key
, partition
, headers
e topic
são opcionais.
Para obter mais detalhes no decorador append_flow
, consulte Utilize o fluxo de adição para escrever numa tabela de fluxo a partir de vários fluxos de origem.
Limitações
Apenas a API Python é suportada. SQL não é suportado.
Somente consultas de streaming usando
spark.readStream
edlt.read_stream
são suportadas. Não há suporte para consultas em lote.Apenas
append_flow
pode ser usado para gravar em pias. Não há suporte para outros fluxos, comoapply_changes
, e você não pode usar um coletor em uma definição de conjunto de dados Delta Live Tables. Por exemplo, o seguinte não é suportado:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Para Delta sinks, o nome da tabela deve ser totalmente qualificado. Especificamente, para tabelas externas gerenciadas pelo Unity Catalog, o nome da tabela deve ter o formato
<catalog>.<schema>.<table>
. Para o metastore do Hive, ele deve estar na forma<schema>.<table>
.A execução
FullRefresh
não limpará os dados de resultados previamente computados nos coletores. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.As expectativas de funcionalidades do Delta Live Tables não são suportadas.