Dela via


Kör din första ETL-arbetsbelastning på Azure Databricks

Lär dig hur du använder produktionsklara verktyg från Azure Databricks för att utveckla och distribuera dina första ETL-pipelines (extrahering, transformering och inläsning) för dataorkestrering.

I slutet av den här artikeln kommer du att känna dig bekväm:

  1. Starta ett Databricks-beräkningskluster för alla syften.
  2. Skapa en Databricks-anteckningsbok.
  3. Konfigurera inkrementell datainmatning till Delta Lake med automatisk inläsning.
  4. Köra notebook-celler för att bearbeta, fråga och förhandsgranska data.
  5. Schemalägga en notebook-fil som ett Databricks-jobb.

I den här självstudien används interaktiva notebook-filer för att slutföra vanliga ETL-uppgifter i Python eller Scala.

Du kan också använda Delta Live Tables för att skapa ETL-pipelines. Databricks skapade Delta Live Tables för att minska komplexiteten med att skapa, distribuera och underhålla ETL-pipelines för produktion. Se Självstudie: Kör din första Delta Live Tables-pipeline.

Du kan också använda Databricks Terraform-providern för att skapa den här artikelns resurser. Se Skapa kluster, notebook-filer och jobb med Terraform.

Krav

Kommentar

Om du inte har behörighet för klusterkontroll kan du fortfarande slutföra de flesta av stegen nedan så länge du har åtkomst till ett kluster.

Steg 1: Skapa ett kluster

Om du vill utföra undersökande dataanalys och datateknik skapar du ett kluster för att tillhandahålla de beräkningsresurser som krävs för att köra kommandon.

  1. Klicka på beräkningsikon Beräkna i sidofältet.
  2. På sidan Beräkning klickar du på Skapa kluster. Då öppnas sidan Nytt kluster.
  3. Ange ett unikt namn för klustret, lämna de återstående värdena i standardtillståndet och klicka på Skapa kluster.

Mer information om Databricks-kluster finns i Beräkning.

Steg 2: Skapa en Databricks-notebook-fil

Om du vill skapa en notebook-fil på arbetsytan klickar du på Ny ikon Nytt i sidofältet och klickar sedan på Notebook. En tom anteckningsbok öppnas på arbetsytan.

Mer information om hur du skapar och hanterar notebook-filer finns i Hantera notebook-filer.

Steg 3: Konfigurera automatisk inläsning för att mata in data till Delta Lake

Databricks rekommenderar att du använder automatisk inläsning för inkrementell datainmatning. Automatisk inläsning identifierar och bearbetar automatiskt nya filer när de tas emot i molnobjektlagring.

Databricks rekommenderar att du lagrar data med Delta Lake. Delta Lake är ett öppen källkod lagringslager som tillhandahåller ACID-transaktioner och möjliggör datasjöhuset. Delta Lake är standardformatet för tabeller som skapats i Databricks.

Om du vill konfigurera automatisk inläsning för att mata in data till en Delta Lake-tabell kopierar du och klistrar in följande kod i den tomma cellen i anteckningsboken:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Kommentar

Variablerna som definieras i den här koden bör göra det möjligt att köra den på ett säkert sätt utan risk för konflikt med befintliga arbetsytetillgångar eller andra användare. Begränsade nätverks- eller lagringsbehörigheter ger upphov till fel när den här koden körs. kontakta arbetsyteadministratören för att felsöka dessa begränsningar.

Mer information om automatisk inläsning finns i Vad är automatisk inläsning?.

Steg 4: Bearbeta och interagera med data

Notebook-filer kör logikcell för cell. Så här kör du logiken i cellen:

  1. Om du vill köra cellen som du slutförde i föregående steg markerar du cellen och trycker på SKIFT+RETUR.

  2. Om du vill köra frågor mot tabellen som du just har skapat kopierar du och klistrar in följande kod i en tom cell och trycker sedan på SKIFT+RETUR för att köra cellen.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Om du vill förhandsgranska data i dataramen kopierar du och klistrar in följande kod i en tom cell och trycker sedan på SKIFT+RETUR för att köra cellen.

    Python

    display(df)
    

    Scala

    display(df)
    

Mer information om interaktiva alternativ för att visualisera data finns i Visualiseringar i Databricks Notebooks.

Steg 5: Schemalägga ett jobb

Du kan köra Databricks-notebook-filer som produktionsskript genom att lägga till dem som en uppgift i ett Databricks-jobb. I det här steget skapar du ett nytt jobb som du kan utlösa manuellt.

Så här schemalägger du anteckningsboken som en uppgift:

  1. Klicka på Schemalägg till höger i rubrikfältet.
  2. Ange ett unikt namn för jobbnamnet.
  3. Klicka på Manuell.
  4. I listrutan Kluster väljer du det kluster som du skapade i steg 1.
  5. Klicka på Skapa.
  6. I fönstret som visas klickar du på Kör nu.
  7. Om du vill se resultatet av jobbkörningen Extern länk klickar du på ikonen bredvid tidsstämpeln Senaste körning .

Mer information om jobb finns i Vad är Databricks-jobb?.

Ytterligare integreringar

Läs mer om integreringar och verktyg för datateknik med Azure Databricks: