Caricare dati con Delta Live Tables
È possibile caricare dati da qualsiasi origine dati supportata da Apache Spark in Azure Databricks usando Delta Live Tables. È possibile definire set di dati (tables e views) in Delta Live Tables su qualsiasi query che restituisca un dataframe Spark, inclusi i dataframe di streaming e Pandas per i dataframe Spark. Per le attività di inserimento dati, Databricks consiglia di usare lo streaming tables per la maggior parte dei casi d'uso. Gli streaming tables sono utili per l'ingestione di dati dagli storage di oggetti cloud utilizzando Auto Loader o da bus di messaggistica come Kafka. Gli esempi seguenti illustrano alcuni modelli comuni.
Importante
Non tutte le origini dati supportano SQL. È possibile combinare notebook SQL e notebook Python in una pipeline di Tables Delta Live per usare SQL per tutte le operazioni oltre l'acquisizione dei dati.
Per informazioni dettagliate sull'uso delle librerie non incluse in pacchetti di Delta Live Tables di default, vedere Gestire le dipendenze di Python per le pipeline Delta Live Tables.
Caricare file dall'archiviazione di oggetti cloud
Databricks consiglia di usare il caricatore automatico con Delta Live Tables per la maggior parte delle attività di inserimento dati dall'archiviazione di oggetti cloud. Il caricatore automatico e Delta Live Tables è progettato per caricare in modo incrementale e idempotente i dati in continua crescita man mano che arrivano nella memoria cloud. Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:
Nota
Per caricare i file con Auto Loader in una pipeline abilitata per Unity Catalog, è necessario usare percorsi esterni. Per ulteriori informazioni sull'uso di Unity Catalog con Delta Live Tables, vedere Usare Unity Catalog con le pipeline Delta Live Tables.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Vedere Che cos'è il caricatore automatico? e la sintassi SQL del caricatore automatico.
Avviso
Se usi il caricatore automatico con le notifiche dei file e devi eseguire un refresh completo per la pipeline o lo streaming table, è necessario pulire manualmente le risorse. È possibile usare CloudFilesResourceManager in un notebook per eseguire la pulizia.
Caricare dati da un bus di messaggi
È possibile configurare pipeline Delta Live Tables per inserire dati da bus di messaggio in streaming tables. Databricks consiglia di combinare lo streaming tables con l'esecuzione continua e una scalabilità automatica ottimizzata per offrire un'integrazione più efficiente nel caricamento a bassa latenza dai bus di messaggi. Vedere Optimize l'utilizzo del cluster delle pipeline di Tables Delta Live con scalabilità automatica avanzata.
Ad esempio, il codice seguente configura uno schema di streaming table per acquisire dati da Kafka.
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
È possibile scrivere operazioni downstream in SQL puro per eseguire trasformazioni di streaming su questi dati, come nell'esempio seguente:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
Per un esempio di utilizzo di Hub eventi, vedere Usare Hub eventi di Azure come origine dati Delta Live Tables.
Vedere Configurare le origini dei dati di streaming.
Caricare dati da sistemi esterni
Delta Live Tables supporta il caricamento di dati da qualsiasi origine dati supportata da Azure Databricks. Vedere Connettersi alle origini dati. È anche possibile caricare dati esterni usando Lakehouse Federation per le origini dati supportate. Poiché Lakehouse Federation richiede Databricks Runtime 13.3 LTS o versione successiva, per usare Lakehouse Federation la pipeline deve essere configurata per usare il canale di anteprima.
Alcune origini dati non hanno supporto equivalente in SQL. Se non è possibile usare Lakehouse Federation con una di queste origini dati, è possibile usare un notebook Python per inserire dati dall'origine. È possibile aggiungere codice sorgente Python e SQL alla stessa pipeline di Tables Delta Live. L'esempio seguente dichiara una vista materializzata per accedere allo stato corrente dei dati in un tablePostgreSQL remoto:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Caricare set di dati statici o di piccole dimensioni dall'archiviazione di oggetti cloud
È possibile caricare set di dati statici o di piccole dimensioni usando la sintassi di caricamento di Apache Spark. Delta Live Tables supporta tutti i formati di file supportati da Apache Spark in Azure Databricks. Per un listcompleto, vedere opzioni di formato dati .
Gli esempi seguenti illustrano il caricamento di JSON per creare delta Live Tablestables:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Nota
Il SELECT * FROM format.`path`;
costrutto SQL è comune a tutti gli ambienti SQL in Azure Databricks. È il modello consigliato per l'accesso diretto ai file usando SQL con Delta Live Tables.
Accedere in modo sicuro agli archivi credentials utilizzando segreti in una pipeline
È possibile usare azure Databricks segreti per archiviare credentials, ad esempio chiavi di accesso o password. Per configurare il segreto nella pipeline, usare una proprietà Spark nella configurazione del cluster delle impostazioni della pipeline. Consultare Configurare il calcolo per una Delta Live Pipeline Tables.
L'esempio seguente usa un segreto per archiviare una chiave di accesso necessaria per leggere i dati di input da un account di archiviazione di Azure Data Lake Storage Gen2 (ADLS Gen2) usando il caricatore automatico. È possibile usare questo stesso metodo per configurare qualsiasi segreto richiesto dalla pipeline, ad esempio le chiavi AWS per accedere a S3 o la password a un metastore Apache Hive.
Per altre informazioni sull'uso di Azure Data Lake Storage Gen2, vedere Connettersi ad Azure Data Lake Storage Gen2 e archiviazione BLOB.
Nota
È necessario aggiungere il spark.hadoop.
prefisso alla spark_conf
chiave di configurazione che imposta il valore del segreto.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Sostituzione
-
<storage-account-name>
con il nome dell'account di archiviazione DILS Gen2. -
<scope-name>
con il nome dell'ambito del segreto di Azure Databricks. -
<secret-name>
con il nome della chiave contenente la chiave di accesso dell'account di archiviazione di Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Sostituzione
-
<container-name>
con il nome del contenitore dell'account di archiviazione di Azure che archivia i dati di input. -
<storage-account-name>
con il nome dell'account di archiviazione DILS Gen2. -
<path-to-input-dataset>
con il percorso del set di dati di input.
Caricare dati da Hub eventi di Azure
Hub eventi di Azure è un servizio di streaming di dati che fornisce un'interfaccia compatibile con Apache Kafka. È possibile usare il connettore Structured Streaming Kafka, incluso nel runtime di Tables Delta Live, per caricare messaggi da Hub eventi di Azure. Per altre informazioni sul caricamento e l'elaborazione dei messaggi da Hub eventi di Azure, vedere Usare Hub eventi di Azure come origine dati delta live Tables.