Condividi tramite


Eseguire il primo carico di lavoro ETL in Azure Databricks

Informazioni su come usare gli strumenti pronti per la produzione di Azure Databricks per sviluppare e distribuire le prime pipeline di estrazione, trasformazione e caricamento (ETL) per l'orchestrazione dei dati.

Al termine di questo articolo, si avrà familiarità con:

  1. Avvio di un cluster di calcolo multiuso di Databricks.
  2. Creazione di un notebook di Databricks.
  3. Configurazione dell'inserimento incrementale dei dati in Delta Lake con il caricatore automatico.
  4. Esecuzione di celle del notebook per elaborare, esecuzione di query e visualizzazione dei dati in anteprima.
  5. Pianificazione di un notebook come processo di Databricks.

Questa esercitazione usa notebook interattivi per completare attività ETL comuni in Python o Scala.

È anche possibile usare delta live Tables per compilare pipeline ETL. Databricks ha creato delta Live Tables per ridurre la complessità della compilazione, della distribuzione e della gestione delle pipeline ETL di produzione. Consulta Esercitazione: Eseguire una prima pipeline Delta Live Tables.

È anche possibile usare il provider Databricks Terraform per creare le risorse di questo articolo. Consultare Creare cluster, notebook e processi con Terraform.

Requisiti

Nota

Se non si dispone dei privilegi di controllo del cluster, è comunque possibile completare la maggior parte dei passaggi seguenti purché si abbia accesso a un cluster.

Passaggio 1: Creare un cluster

Per eseguire l'analisi esplorativa dei dati e la progettazione dei dati, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.

  1. Nella barra laterale fare clic su icona dell’ambiente di calcoloAmbiente di calcolo.
  2. Nella pagina dell’ambiente di calcolo, fare clic su Crea cluster. Verrà visualizzata la pagina Nuovo cluster.
  3. Specificare un nome univoco per il cluster, lasciare gli altri values impostati sul loro stato predefinito, e fare clic su Crea Cluster.

Per altre informazioni sui cluster Databricks, consultare Ambiente di calcolo.

Passaggio 2: Creare un notebook in Databricks

Per creare un Notebook nell'area di lavoro, fare clic su Nuova iconaNuovo nella barra laterale e quindi su Notebook. Viene aperto un Notebook vuoto nell'area di lavoro.

Per altre informazioni sulla creazione e la gestione dei Notebook, vedere Gestire i Notebook.

Passaggio 3: Configurare il caricatore automatico per inserire dati in Delta Lake

Databricks consiglia di usare il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

Databricks consiglia di archiviare i dati con Delta Lake. Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID e abilita data lakehouse. Delta Lake è il formato predefinito per tables creato in Databricks.

Per configurare Auto Loader per inserire i dati in Delta Lake table, copiare e incollare il codice seguente nella cella vuota del notebook.

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Nota

Le variabili definite in questo codice devono consentire di eseguirla in modo sicuro senza rischiare conflitti con gli asset dell'area di lavoro esistenti o altri utenti. Le autorizzazioni di rete o archiviazione limitate genereranno errori durante l'esecuzione di questo codice; contattare l'amministratore dell'area di lavoro per risolvere questi problemi.

Per altre informazioni sul caricatore automatico, consultare Che cos'è il caricatore automatico?.

Passaggio 4: Elaborare e interagire con i dati

I notebook eseguono la logica cell-by-cell. Per eseguire la logica nella cella:

  1. Per eseguire la cella completata nel passaggio precedente, select la cella e premere MAIUSC+INVIO.

  2. Per eseguire una query sulla table appena creata, copiare e incollare il codice seguente in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Per visualizzare in anteprima i dati nel dataframe, copiare e incollare il seguente codice in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    Python

    display(df)
    

    Scala

    display(df)
    

Per altre informazioni sulle opzioni interattive per la visualizzazione dei dati, consultare Visualizzazioni nei notebook di Databricks.

Passaggio 5: Pianificare un processo

È possibile eseguire notebook di Databricks come script di produzione aggiungendoli come attività in un processo di Databricks. In questo passaggio verrà creato un nuovo processo che è possibile attivare manualmente.

Per pianificare il notebook come attività:

  1. Fare clic su Pianifica sul lato destro della barra di intestazione.
  2. Immettere un nome univoco in Nome del processo.
  3. Fare clic su Manuale.
  4. Nell'elenco a discesa cluster il cluster creato nel passaggio 1.
  5. Cliccare su Crea.
  6. Nella window visualizzata fare clic su Esegui ora.
  7. Per visualizzare i risultati dell'esecuzione del processo, fare clic sull'icona Collegamento esterno accanto al timestamp Ultima esecuzione.

Per altre informazioni sui processi, consultare Che cosa sono i processi di Databricks?.

Integrazioni aggiuntive

Altre informazioni sulle integrazioni e sugli strumenti per la progettazione dei dati con Azure Databricks: