Freigeben über


Laden von Daten mit Delta Live Tables

Sie können mithilfe von Delta Live Tables Daten aus jeder beliebigen Datenquelle laden, die von Apache Spark in Azure Databricks unterstützt wird. Sie können Datasets (Tabellen und Ansichten) in Delta Live Tables für jede Abfrage definieren, die einen Spark DataFrame zurückgibt, einschließlich Streaming-DataFrames und Pandas für Spark-DataFrames. Für Datenerfassungsaufgaben empfiehlt Databricks für die meisten Anwendungsfälle die Verwendung von Streamingtabellen. Streamingtabellen eignen bei Verwendung des Autoloaders oder von Nachrichtenbussen wie Kafka gut zum Erfassen von Daten aus einem cloudbasierten Objektspeicher. Die folgenden Beispiele veranschaulichen einige gängige Muster.

Wichtig

Nicht alle Datenquellen bieten SQL-Unterstützung. Sie können SQL- und Python-Notebooks in einer Delta Live Tables-Pipeline kombinieren, um SQL über die Erfassung hinaus für alle Vorgänge zu verwenden.

Ausführliche Informationen zum Arbeiten mit Bibliotheken, die nicht standardmäßig in Delta Live Tables verpackt sind, finden Sie unter Verwalten von Python-Abhängigkeiten für Delta Live Tables-Pipelines.

Laden von Dateien aus einem cloudbasierten Objektspeicher

Databricks empfiehlt die Verwendung von Auto Loader mit Delta Live Tables für die meisten Datenerfassungsaufgaben aus dem Cloudobjektspeicher. Auto Loader und Delta Live Tables sind so konzipiert, dass sie inkrementell und idempotent ständig wachsende Datenmengen laden, sobald diese im Cloudspeicher eingehen. In den folgenden Beispielen wird der Autoloader verwendet, um Datasets aus CSV- und JSON-Dateien zu erstellen:

Hinweis

Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung von Unity Catalog mit Delta Live Tables finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Weitere Informationen finden Sie unter Was ist Auto Loader? und Auto Loader-SQL-Syntax.

Warnung

Wenn Sie Auto Loader mit Dateibenachrichtigungen verwenden und eine vollständige Aktualisierung für Ihre Pipeline- oder Streamingtabelle ausführen, müssen Sie Ihre Ressourcen manuell bereinigen. Sie können für die Bereinigung den CloudFilesResourceManager in einem Notebook verwenden.

Laden von Daten aus einem Nachrichtenbus

Sie können Delta Live Tables-Pipelines so konfigurieren, dass Daten aus Nachrichtenbussen mit Streamingtabellen erfasst werden. Databricks empfiehlt die Kombination von Streamingtabellen mit fortlaufender Ausführung und verbesserter automatischer Skalierung, um die effizienteste Erfassung für das latenzarme Laden von Nachrichtenbussen zu erzielen. Weitere Informationen finden Sie unter Optimieren der Clusternutzung von Delta Live Tables-Pipelines mit verbesserter automatischer Skalierung.

Der folgende Code konfiguriert beispielsweise eine Streamingtabelle zum Erfassen von Daten aus Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Sie können nachgeschaltete Vorgänge in reinem SQL schreiben, um Streamingtransformationen für diese Daten auszuführen, wie im folgenden Beispiel gezeigt:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Ein Beispiel für die Arbeit mit Event Hubs finden Sie unter Verwenden von Azure Event Hubs als Delta Live Tables-Datenquelle.

Weitere Informationen finden Sie unter Konfigurieren von Streamingdatenquellen.

Laden von Daten von externen Speicherorten

Delta Live Tables unterstützt das Laden von Daten aus einer beliebigen Datenquelle, die von Azure Databricks unterstützt wird. Weitere Informationen finden Sie unter Herstellen von Verbindungen mit Datenquellen. Sie können externe Daten auch mithilfe des Lakehouse-Verbunds für unterstützte Datenquellen laden. Da der Lakehouse-Verbund Databricks Runtime 13.3 LTS oder höher erfordert, muss Ihre Pipeline für die Verwendung des Lakehouse-Verbunds für die Verwendung des Vorschaukanals konfiguriert werden.

Einige Datenquellen bieten keine solche Unterstützung in SQL. Wenn Sie lakehouse Federation nicht mit einer dieser Datenquellen verwenden können, können Sie ein Python-Notizbuch verwenden, um Daten aus der Quelle aufzunehmen. Sie können Python- und SQL-Quellcode zur gleichen Delta Live Tables-Pipeline hinzufügen. Im folgenden Beispiel wird eine materialisierte Sicht deklariert, um auf den aktuellen Zustand von Daten in einer PostgreSQL-Remotetabelle zuzugreifen:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Laden kleiner oder statischer Datasets aus einem Cloudobjektspeicher

Sie können kleine oder statische Datasets mithilfe der Ladesyntax von Apache Spark laden. Delta Live Tables unterstützt alle Dateiformate, die von Apache Spark in Azure Databricks unterstützt werden. Eine vollständige Liste finden Sie unter Datenformatoptionen.

Die folgenden Beispiele veranschaulichen das Laden von JSON-Code zum Erstellen von Delta Live Tables-Tabellen:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Hinweis

Das SQL-Konstrukt SELECT * FROM format.`path`; ist in allen SQL-Umgebungen in Azure Databricks üblich. Es ist das empfohlene Muster für den direkten Dateizugriff mit SQL und Delta Live Tables.

Sicherer Zugriff auf Speicheranmeldeinformationen mit Geheimnissen in einer Pipeline

Sie können Azure Databricks-Geheimnisse verwenden, um Anmeldeinformationen wie Zugriffsschlüssel oder Kennwörter zu speichern. Verwenden Sie zum Konfigurieren des Geheimen in Ihrer Pipeline eine Spark-Eigenschaft in der Pipelineeinstellungen-Clusterkonfiguration. Siehe Konfigurieren der Compute für eine Delta Live Tables-Pipeline.

Im folgenden Beispiel wird ein Geheimschlüssel verwendet, um einen Zugriffsschlüssel zu speichern, der zum Lesen von Eingabedaten aus einem Azure Data Lake Storage Gen2 (ADLS Gen2)-Speicherkonto mithilfe des automatischen Ladens erforderlich ist. Sie können diese Methode verwenden, um alle geheimen Schlüssel zu konfigurieren, die von Ihrer Pipeline erforderlich sind, z. B. AWS-Schlüssel zum Zugriff auf S3 oder das Kennwort zu einem Apache Hive-Metastore.

Weitere Informationen zum Arbeiten mit Azure Data Lake Storage Gen2 finden Sie unter Herstellen einer Verbindung mit Azure Data Lake Storage Gen2 und Blob Storage.

Hinweis

Sie müssen dem spark.hadoop. Konfigurationsschlüssel das spark_conf Präfix hinzufügen, das den geheimen Wert festlegt.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> durch den Namen des ADLS Gen2-Speicherkontos.
  • <scope-name> mit dem Namen des Azure Databricks-Geheimnisbereichs.
  • <secret-name> mit dem Namen des Schlüssels, der den Zugriffsschlüssel für das Azure-Speicherkonto enthält.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> mit dem Namen des Azure-Speicherkontocontainers, der die Eingabedaten speichert.
  • <storage-account-name> durch den Namen des ADLS Gen2-Speicherkontos.
  • <path-to-input-dataset> mit dem Pfad zum Eingabedatensatz.

Laden von Daten aus Azure Event Hubs

Azure Event Hubs ist ein Datenstreamingdienst, der eine mit Apache Kafka kompatible Schnittstelle bereitstellt. Sie können den Kafka-Connector für strukturiertes Streaming verwenden, der in der Delta Live Tables-Runtime enthalten ist, um Nachrichten aus Azure Event Hubs zu laden. Weitere Informationen zum Laden und Verarbeiten von Nachrichten aus Azure Event Hubs finden Sie unter Verwenden von Azure Event Hubs als Delta Live Tables-Datenquelle.