Freigeben über


Führen Sie Ihren ersten ETL-Workload auf Azure Databricks durch

Erfahren Sie, wie Sie produktionsfähige Tools von Azure Databricks verwenden, um Ihre ersten ETL-Pipelines (Extract, Transform and Load, ETL) für die Daten-Orchestrierung zu entwickeln und bereitzustellen.

Am Ende dieses Artikels sind Sie mit Folgendem vertraut:

  1. Starten eines Databricks-All-Purpose-Computeclusters.
  2. Erstellen eines Databricks-Notebooks
  3. Konfigurieren der inkrementellen Datenerfassung in Delta Lake mit Autoloader.
  4. Ausführen von Notebookzellen zum Verarbeiten und Abfragen von Daten sowie zum Anzeigen einer Vorschau der Daten
  5. Planen eines Notebooks als Databricks-Auftrag

In diesem Tutorial werden interaktive Notebooks verwendet, um allgemeine ETL-Aufgaben in Python oder Scala auszuführen.

Sie können außerdem Delta Live Tables verwenden, um ETL-Pipelines zu erstellen. Databricks hat Delta Live Tables erstellt, um die Komplexität des Erstellens, Bereitstellens und Verwaltens von ETL-Pipelines in der Produktion zu reduzieren. Siehe Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline.

Sie können außerdem den Databricks Terraform-Anbieter verwenden, um die Ressourcen dieses Artikels zu erstellen. Weitere Informationen finden Sie unter Erstellen von Clustern, Notebooks und Aufträgen mit Terraform.

Anforderungen

Hinweis

Wenn Sie keine Clustersteuerungsberechtigungen haben, können Sie dennoch die meisten der folgenden Schritte ausführen, solange Sie Zugriff auf einen Cluster haben.

Schritt 1: Erstellen eines Clusters

Erstellen Sie für die explorative Datenanalyse und Datentechnik einen Cluster, um die für die Ausführung von Befehlen erforderlichen Computeressourcen bereitzustellen.

  1. Klicken Sie auf der Seitenleiste auf Computesymbol Compute.
  2. Klicken Sie auf der Seite „Compute“ auf Cluster erstellen. Dadurch wird die Seite „Neuer Cluster“ geöffnet.
  3. Geben Sie einen eindeutigen Namen für den Cluster an, lassen Sie die verbleibenden Werte in ihrem Standardzustand und klicken Sie auf Cluster erstellen.

Weitere Informationen zu Databricks-Clustern finden Sie unter Compute.

Schritt 2: Erstellen eines Databricks-Notebooks

Wenn Sie ein Notebook in Ihrem Arbeitsbereich erstellen möchten, wählen Sie in der Randleiste Neues SymbolNeu aus, und wählen Sie dann Notebook aus. Im Arbeitsbereich wird ein leeres Notebook geöffnet.

Weitere Informationen zum Erstellen und Verwalten von Notebooks finden Sie unter Verwalten von Notebooks.

Schritt 3: Konfigurieren des Autoloaders zum Erfassen von Daten in Delta Lake

Databricks empfiehlt die Verwendung des Autoloaders für die inkrementelle Datenerfassung. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden.

Databricks empfiehlt das Speichern von Daten mit Delta Lake. Delta Lake ist eine Open Source-Speicherebene, die ACID-Transaktionen einführt und Data Lakehouse aktiviert. Delta Lake ist das Standardformat für Tabellen, die in Databricks erstellt wurden.

Wenn Sie Autoloader zum Erfassen von Daten in einer Delta Lake-Tabelle konfigurieren möchten, kopieren Sie den folgenden Code in eine leere Zelle in Ihrem Notebook:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Hinweis

Die in diesem Code definierten Variablen sollten Ihnen eine sichere Ausführung ermöglichen, ohne dass es zu Konflikten mit bestehenden Arbeitsbereichsressourcen oder anderen Benutzern kommt. Eingeschränkte Netzwerk- oder Speicherberechtigungen lösen beim Ausführen dieses Codes Fehler aus. Wenden Sie sich an Ihren Arbeitsbereichsadministrator, um diese Einschränkungen zu behandeln.

Um mehr über Auto Loader zu erfahren, siehe Was ist Auto Loader?.

Schritt 4: Verarbeiten von und Interagieren mit Daten

Notebooks führen Logik Zelle für Zelle aus. Führen Sie die folgenden Schritte aus, um die Logik in Ihrer Zelle auszuführen:

  1. Wenn Sie die im vorherigen Schritt abgeschlossene Zelle ausführen möchten, wählen Sie die Zelle aus, und drücken Sie UMSCHALT+EINGABE.

  2. Wenn Sie die gerade erstellte Tabelle abfragen möchten, kopieren Sie den folgenden Code in eine leere Zelle, und drücken Sie dann UMSCHALT+EINGABE, um die Zelle auszuführen.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Wenn Sie eine Vorschau der Daten im Datenrahmen anzeigen möchten, kopieren Sie den folgenden Code in eine leere Zelle, und drücken Sie dann UMSCHALT+EINGABE, um die Zelle auszuführen.

    Python

    display(df)
    

    Scala

    display(df)
    

Weitere Informationen zu interaktiven Optionen zur Visualisierung von Daten finden Sie unter Visualisierungen in Databricks Notebooks.

Schritt 5: Planen eines Auftrags

Sie können Databricks-Notebooks als Produktionsskripts ausführen, indem Sie sie als Aufgabe in einem Databricks-Auftrag hinzufügen. In diesem Schritt erstellen Sie einen neuen Auftrag, den Sie manuell auslösen können.

So planen Sie Ihr Notebook als Aufgabe

  1. Klicken Sie auf der rechten Seite der Kopfleiste auf Planen.
  2. Geben Sie einen eindeutigen Namen unter Auftragsname ein.
  3. Klicken Sie auf Manuell.
  4. Wählen Sie in der Dropdownliste Cluster den Cluster aus, den Sie in Schritt 1 erstellt haben.
  5. Klicken Sie auf Erstellen.
  6. Klicken Sie im angezeigten Fenster auf Jetzt ausführen.
  7. Klicken Sie neben dem Zeitstempel Letzte Ausführung auf das Symbol Externer Link, um die Ergebnisse der Auftragsausführung anzuzeigen.

Weitere Informationen zu Aufträgen finden Sie unter Was sind Databricks-Aufträge?.

Zusätzliche Integrationen

Erfahren Sie mehr über Integrationen und Tools für Datentechnik mit Azure Databricks: