Trasmettere record a servizi esterni utilizzando le destinazioni di Delta Live Tables
Importante
L'API sink
Tabelle Live Delta si trova in Anteprima Pubblica.
Questo articolo descrive l'API Delta Live Tables sink
e come usarla con i flussi DLT per scrivere record trasformati da una pipeline in un sink di dati esterno, ad esempio tabelle gestite e esterne del catalogo Unity, tabelle metastore Hive e servizi di streaming di eventi come Apache Kafka o Hub eventi di Azure.
Che cosa sono i sink di Tabelle live Delta?
I sink delle tabelle live Delta consentono di scrivere dati trasformati verso destinazioni come servizi di streaming di eventi, come Apache Kafka o Hub di eventi di Azure, e tabelle esterne gestite dal Catalogo Unity o dal metastore Hive. In precedenza, le tabelle di streaming e le viste materializzate create in una pipeline di tabelle live Delta potrebbero essere rese persistenti solo nelle tabelle Delta gestite di Azure Databricks. Usando i sink, sono ora disponibili altre opzioni per rendere persistente l'output delle pipeline di tabelle live Delta.
Quando è consigliabile usare sink di tabelle live Delta?
Databricks consiglia di utilizzare i sink delle Delta Live Tables se necessario:
- Creare un caso d'uso operativo come il rilevamento delle frodi, l'analisi in tempo reale e le raccomandazioni dei clienti. I casi d'uso operativi in genere leggono i dati da un bus di messaggi, ad esempio un argomento Apache Kafka, e quindi elaborano i dati con bassa latenza e scrivono nuovamente i record elaborati in un bus di messaggi. Questo approccio consente di ottenere una latenza inferiore senza scrivere o leggere dall'archiviazione cloud.
- Scrivere dati trasformati dai flussi di Delta Live Tables alle tabelle gestite da un'istanza Delta esterna, incluse le tabelle Unity Catalog gestite ed esterne e le tabelle metastore Hive.
- Eseguire il caricamento di estrazione inverso (ETL) in sink esterni a Databricks, ad esempio gli argomenti di Apache Kafka. Questo approccio consente di supportare in modo efficace i casi d'uso in cui i dati devono essere letti o usati all'esterno delle tabelle di Unity Catalog o di altre risorse di archiviazione gestite da Databricks.
Come si usano le destinazioni di Delta Live Tables?
Nota
- Sono supportate solo le query di streaming che usano
spark.readStream
edlt.read_stream
. Le query batch non sono supportate. - È possibile usare solo
append_flow
per scrivere nei sink. Altri flussi, ad esempioapply_changes
, non sono supportati. - L'esecuzione di un aggiornamento completo non pulisce i dati dei risultati calcolati in precedenza nei sink. Ciò significa che tutti i dati elaborati verranno aggiunti al sink e i dati esistenti non verranno modificati.
Quando i dati degli eventi vengono inseriti da una sorgente di streaming nella tua pipeline Delta Live Tables, puoi elaborare e perfezionare questi dati utilizzando la funzionalità Delta Live Tables e successivamente utilizzare l'elaborazione del flusso di accodamento per trasmettere i record di dati trasformati a un sink Delta Live Tables. Questo sink viene creato usando la funzione create_sink()
. Per ulteriori dettagli sull'uso della funzione create_sink
, consultare il riferimento dell'API sink .
Per implementare un sink di Tabelle live Delta, seguire questa procedura:
- Configurare una pipeline Delta Live Tables per elaborare i dati degli eventi di streaming e preparare i record di dati per la scrittura in un sink Delta Live Tables.
- Configurare e creare il sink delle tabelle live Delta per usare il formato di sink di destinazione preferito.
- Usare un flusso di append per scrivere i record preparati nel sink.
Questi passaggi sono trattati nel resto dell'argomento.
Configurare una pipeline Delta Live Tables per preparare i record per la scrittura in una destinazione di output
Il primo passaggio consiste nel configurare una pipeline di Delta Live Tables per trasformare i dati del flusso di eventi non elaborati nei dati preparati che verranno scritti nel sink.
Per comprendere meglio questo processo, è possibile seguire questo esempio di una pipeline Delta Live Tables che elabora i dati degli eventi clickstream, dai dati di esempio wikipedia-datasets
in Databricks. Questa pipeline analizza il set di dati non elaborato per identificare le pagine di Wikipedia che si collegano a una pagina della documentazione di Apache Spark e affina progressivamente i dati in base alle righe della tabella in cui il collegamento di riferimento contiene Apache_Spark.
In questo esempio, la pipeline di Tabelle Live Delta è strutturata usando l'architettura medallion, che organizza i dati in livelli diversi per migliorare la qualità e l'efficienza di elaborazione.
Per iniziare, carica i record JSON non elaborati dal set di dati nel livello bronze usando Caricatore Automatico. Questo codice Python illustra come creare una tabella di streaming denominata clickstream_raw
, che contiene i dati grezzi non elaborati provenienti dall'origine:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Dopo l'esecuzione di questo codice, i dati sono ora al livello "bronze" (o "dati non elaborati") dell'architettura Medallion e devono essere puliti. Il passaggio successivo consente di perfezionare i dati al livello "silver", che comporta la pulizia dei tipi di dati e dei nomi di colonna e l'uso delle aspettative delle tabelle live Delta per garantire l'integrità dei dati.
Il codice seguente illustra come eseguire questa operazione pulendo e convalidando i dati del livello bronzo nella tabella clickstream_clean
silver:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Per sviluppare il livello "gold" della struttura della pipeline, filtri i dati clickstream puliti per isolare le voci in cui la pagina di riferimento è Apache_Spark
. In questo ultimo esempio di codice si selezionano solo le colonne necessarie per la scrittura nella tabella sink di destinazione.
Il codice seguente illustra come creare una tabella denominata spark_referrers
che rappresenta il livello oro:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Al termine di questo processo di preparazione dei dati, è necessario configurare i sink di destinazione in cui verranno scritti i record puliti.
Configurare un sink di tabelle live Delta
Databricks supporta tre tipi di sink di destinazione in cui si scrivono i record elaborati dai dati del flusso:
- Sink di tabella Delta
- Sink di Apache Kafka
- Destinazioni degli Hub eventi di Azure
Di seguito sono riportati esempi di configurazioni per sink Delta, Kafka e Azure Event Hubs.
Sink differenziali
Per creare un sink Delta in base al percorso del file:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Per creare un sink Delta in base al nome della tabella usando un catalogo completo e un percorso dello schema:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Sink Kafka e Hub eventi di Azure
Questo codice funziona sia per i sink apache Kafka che per Hub eventi di Azure.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Ora che il sink è configurato e la pipeline di Delta Live Tables è preparata, è possibile iniziare a trasmettere i record elaborati al sink.
Scrivere in un sink di Tabelle Live Delta con un'operazione di accodamento
Dopo aver configurato il sink, il passaggio successivo consiste nello scrivere i record elaborati specificandolo come destinazione per i record prodotti da un flusso di aggiunta. Per fare ciò, specifica il sink come valore target
nel decorator append_flow
.
- Per le tabelle gestite ed esterne di Unity Catalog, usare il formato
delta
e specificare il percorso o il nome della tabella nelle opzioni. Le pipeline Delta Live Tables devono essere configurate per utilizzare il Catalogo Unity. - Per gli argomenti di Apache Kafka, usare il formato
kafka
e specificare il nome dell'argomento, le informazioni di connessione e le informazioni di autenticazione nelle opzioni. Queste sono le stesse opzioni supportate da un sink Kafka di Spark Structured Streaming. Consulta Configurare il writer di streaming strutturato Kafka. - Per Hub eventi di Azure, usare il formato
kafka
e specificare il nome, le informazioni di connessione e le informazioni di autenticazione di Hub eventi nelle opzioni. Queste sono le stesse opzioni supportate in un sink di Event Hubs per Spark Structured Streaming che utilizza l'interfaccia Kafka. Vedere 'autenticazione dell'entità servizio con l'ID Microsoft Entra e Hub eventi di Azure. - Per le tabelle metastore Hive, usare il formato
delta
e specificare il percorso o il nome della tabella nelle opzioni. Le pipeline "Delta Live Tables" devono essere configurate per l'uso del metastore Hive.
Di seguito sono riportati esempi di come configurare i flussi per la scrittura in sink Delta, Kafka e Hub eventi di Azure con record elaborati dalla pipeline di tabelle live Delta.
Lavello Delta
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Sink di Kafka e Hub Eventi di Azure
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Il parametro value
è obbligatorio per un sink di Hub eventi di Azure. Altri parametri, ad esempio key
, partition
, headers
e topic
sono facoltativi.
Per ulteriori dettagli sul decoratore append_flow
, vedere Utilizzare il flow di appending per scrivere in una tabella di streaming da più flussi di origine.
Limitazioni
È supportata solo l'API Python. SQL non è supportato.
Sono supportate solo le query di streaming che usano
spark.readStream
edlt.read_stream
. Le query batch non sono supportate.È possibile usare solo
append_flow
per scrivere nei sink. Altri flussi, ad esempioapply_changes
, non sono supportati e non è possibile usare un sink in una definizione di set di dati Delta Live Tables. Ad esempio, il codice seguente non è supportato:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Per i sink Delta, il nome della tabella deve essere completamente qualificato. In particolare, per le tabelle esterne gestite dal catalogo Unity, il nome della tabella deve essere nel formato
<catalog>.<schema>.<table>
. Per il metastore Hive, deve essere nel formato<schema>.<table>
.L'esecuzione di
FullRefresh
non pulisce i dati dei risultati calcolati in precedenza nei sink. Ciò significa che tutti i dati elaborati verranno aggiunti al sink e i dati esistenti non verranno modificati.Le funzionalità di aspettative delle Delta Live Tables non sono supportate.