Freigeben über


Laden von Daten mithilfe von Streamingtabellen in Databricks SQL

Databricks empfiehlt die Verwendung von Streamingtabellen zum Erfassen von Daten mit Databricks SQL. Eine Streamingtabelle ist eine bei Unity Catalog registrierte Tabelle mit zusätzlicher Unterstützung für Streaming oder inkrementelle Datenverarbeitung. Für jede Streamingtabelle wird automatisch eine Delta Live Tabellen Pipeline erstellt. Sie können Streamingtabellen für das inkrementelle Laden von Daten von Kafka und vom Cloudobjektspeicher verwenden.

In diesem Artikel wird die Verwendung von Streamingtabellen zum Laden von Daten aus dem Cloudobjektspeicher veranschaulicht, der als Unity Catalog-Volume (empfohlen) oder externer Speicherort konfiguriert ist.

Hinweis

Informationen zum Verwenden von Delta Lake-Tabellen als Streamingquellen und -senken finden Sie unter Delta-Tabelle: Streaming für Lese- und Schreibvorgänge.

Wichtig

In Databricks SQL erstellte Streamingtabellen werden von einer serverlosen Delta Live Tables-Pipeline unterstützt. Ihr Arbeitsbereich muss serverlose Pipelines unterstützen, um diese Funktionalität zu verwenden.

Vorbemerkungen

Bevor Sie beginnen, müssen die folgenden Voraussetzungen erfüllt sein.

Anforderungen an den Arbeitsbereich:

Computeanforderungen:

Sie müssen eine der folgenden Voraussetzungen verwenden:

  • Ein SQL-Warehouse, das den Current-Kanal verwendet.

  • Compute im Modus für gemeinsamen Zugriff in Databricks Runtime 13.3 LTS oder höher.

  • Compute with single user access mode on Databricks Runtime 15.4 LTS or above.

    Auf Databricks Runtime 15.3 und unten können Sie keine einzelnen Benutzer berechnen, um Streamingtabellen abzufragen, die im Besitz anderer Benutzer sind. Sie können single user compute on Databricks Runtime 15.3 and below only if you own the streaming table verwenden. Der Ersteller der Tabelle ist der Besitzer.

    Databricks Runtime 15.4 LTS und höher unterstützen Abfragen zu Delta Live Tables-generierten Tabellen bei der Berechnung einzelner Benutzer, unabhängig vom Tabellenbesitz. Um die in Databricks Runtime 15.4 LTS und höher bereitgestellten Datenfilter nutzen zu können, müssen Sie auch überprüfen, ob Ihr Arbeitsbereich für serverloses Computing aktiviert ist, da die Datenfilterfunktion, die von Delta Live Tables generierte Tabellen unterstützt, auf serverlosem Computing ausgeführt wird. Sie können serverlose Computeressourcen in Rechnung stellen, wenn Sie datenfilterungsvorgänge mit einem einzelnen Benutzer ausführen. Weitere Informationen finden Sie unter Feinkornierte Zugriffssteuerung für die Berechnung einzelner Benutzer.

Berechtigungsanforderungen:

  • Die READ FILES-Berechtigung für einen externen Unity Catalog-Speicherort. Weitere Informationen finden Sie unter Erstellen eines externen Speicherorts zum Verbinden des Cloudspeichers mit Azure Databricks.
  • Die USE CATALOG-Berechtigung für den Katalog, in dem Sie die Streamingtabelle erstellen.
  • Die USE SCHEMA-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen.
  • Die CREATE TABLE-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen.

Sonstige Anforderungen:

  • Der Pfad zu Ihren Quelldaten.

    Volumepfadbeispiel: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Beispiel für externen Speicherortpfad: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Hinweis

    In diesem Artikel wird davon ausgegangen, dass sich die Daten, die Sie laden möchten, an einem Cloudspeicherort befinden, der einem Unity Catalog-Volume oder einem externen Speicherort entspricht, auf den Sie Zugriff haben.

Ermitteln und Anzeigen einer Vorschau von Quelldaten

  1. Klicken Sie in der Seitenleiste Ihres Arbeitsbereichs auf Abfragen, und klicken Sie dann auf Abfrage erstellen.

  2. Wählen Sie im Abfrage-Editor in der Dropdownliste ein SQL-Warehouse aus, das den Current-Kanal verwendet.

  3. Fügen Sie Folgendes in den Editor ein. Ersetzen Sie dabei die Werte in spitzen Klammern (<>) durch die Informationen, die Ihre Quelldaten identifizieren, und klicken Sie dann auf Ausführen.

    Hinweis

    Beim Ausführen der Tabellenwertfunktion read_files treten möglicherweise Schemarückschlussfehler auf, wenn die Standardwerte für die Funktion Ihre Daten nicht analysieren können. Beispielsweise müssen Sie möglicherweise den mehrzeiligen Modus für mehrzeilige CSV- oder JSON-Dateien konfigurieren. Eine Liste der Parseroptionen finden Sie unter Tabellenwertfunktion read_files.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Laden von Daten in eine Streamingtabelle

Um eine Streamingtabelle aus Daten im Cloudobjektspeicher zu erstellen, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Festlegen des Laufzeitkanals

Streamingtabellen, die mit SQL-Warehouses erstellt wurden, werden automatisch mithilfe einer Delta Live Tables-Pipeline aktualisiert. Delta Live Tables-Pipelines verwenden standardmäßig die Laufzeit im current Kanal. Lesen Sie die Versionshinweise zu Delta Live Tables und den Prozess des Releaseupgrades, um mehr über den Veröffentlichungsprozess zu erfahren.

Databricks empfiehlt die Verwendung des current Kanals für Produktionsworkloads. Neue Features werden zuerst im preview Kanal veröffentlicht. Sie können eine Pipeline auf den Vorschaukanal Delta Live Tables festlegen, um neue Features zu testen, indem Sie als Tabelleneigenschaft angeben preview . Sie können diese Eigenschaft angeben, wenn Sie die Tabelle erstellen oder nachdem die Tabelle mithilfe einer ALTER-Anweisung erstellt wurde.

Das folgende Codebeispiel zeigt, wie Sie den Kanal in einer CREATE-Anweisung auf eine Vorschau festlegen:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Aktualisieren einer Streamingtabelle mithilfe einer DLT-Pipeline

In diesem Abschnitt werden Muster zum Aktualisieren einer Streamingtabelle mit den neuesten Daten beschrieben, die aus den in der Abfrage definierten Quellen verfügbar sind.

Wenn Sie eine Streamingtabelle CREATE oder REFRESH verwenden, wird die Aktualisierung mithilfe einer serverlosen Delta Live Tables-Pipeline verarbeitet. Jede von Ihnen definierte Streamingtabelle verfügt über eine zugeordnete Delta Live Tables-Pipeline.

Nachdem Sie den Befehl REFRESH ausgeführt haben, wird die DLT-Pipelineverbindung zurückgegeben. Sie können den DLT-Pipelinelink verwenden, um den Status der Aktualisierung zu überprüfen.

Hinweis

Nur der Tabellenbesitzer kann eine Streamingtabelle aktualisieren, um die neuesten Daten abzurufen. Der Benutzer, der die Tabelle erstellt, ist der Besitzer, und der Besitzer kann nicht geändert werden. Möglicherweise müssen Sie Ihre Streamingtabelle aktualisieren, bevor Sie Zeitreise-Abfragen verwenden.

Weitere Informationen finden Sie unter Was sind Delta Live Tables?.

Nur neue Daten aufnehmen

Standardmäßig liest die read_files-Funktion alle vorhandenen Daten im Quellverzeichnis während der Tabellenerstellung und verarbeitet dann neu eingehende Datensätze mit jeder Aktualisierung.

Um das Aufnehmen von Daten zu vermeiden, die zum Zeitpunkt der Tabellenerstellung bereits im Quellverzeichnis vorhanden sind, legen Sie die Option includeExistingFiles auf falsefest. Dies bedeutet, dass nur Daten verarbeitet werden, die nach der Tabellenerstellung im Verzeichnis eingehen. Beispiel:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Vollständiges Aktualisieren einer Streamingtabelle

Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die nicht den gesamten Datenverlauf beibehalten oder kurze Aufbewahrungszeiträume aufweisen, wie z. B. Kafka, da durch eine vollständige Aktualisierung die vorhandenen Daten abgeschnitten werden. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.

Beispiel:

REFRESH STREAMING TABLE my_bronze_table FULL

Planen einer Streamingtabelle für die automatische Aktualisierung

Um eine Streamingtabelle für die automatische Aktualisierung basierend auf einem definierten Zeitplan zu konfigurieren, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Ein Beispiel für Abfragen von Aktualisierungszeitplänen finden Sie unter ALTER STREAMING TABLE.

Nachverfolgen des Status einer Aktualisierung

Sie können den Status einer Streamingtabellenaktualisierung anzeigen, indem Sie die Pipeline anzeigen, die die Streamingtabelle auf der Benutzeroberfläche von Delta Live Tables verwaltet, oder indem Sie die vom -Befehl für die Streamingtabelle zurückgegebenen DESCRIBE EXTENDED anzeigen.

DESCRIBE EXTENDED <table-name>

Streamingerfassung von Kafka

Ein Beispiel für die Streamingerfassung von Kafka finden Sie unter read_kafka.

Gewähren des Zugriffs auf eine Streamingtabelle für Benutzer*innen

Um Benutzer*innen die SELECT-Berechtigung für die Streamingtabelle zu gewähren, damit sie sie abfragen können, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Weitere Informationen zum Gewähren von Berechtigungen für sicherungsfähige Unity Catalog-Objekte finden Sie unter Unity Catalog-Berechtigungen und sicherungsfähige Objekte.

Überwachen der Ausführung mithilfe des Abfrageverlaufs

Sie können die Abfrageverlaufsseite verwenden, um auf Abfragedetails und Abfrageprofile zuzugreifen, die Ihnen dabei helfen können, abfragen und Engpässe in der Delta Live Tables-Pipeline zu identifizieren, die zum Ausführen Ihrer Streamingtabellenaktualisierungen verwendet werden. Eine Übersicht über die Art der informationen, die in Abfragehistorien und Abfrageprofilen verfügbar sind, finden Sie unter "Abfrageverlauf " und "Abfrageprofil".

Wichtig

Dieses Feature befindet sich in der Public Preview. Arbeitsbereichsadministratoren und -administratorinnen können dieses Feature über die Seite Vorschau aktivieren. Weitere Informationen finden Sie unter Verwalten von Azure Databricks Previews.

Alle Anweisungen im Zusammenhang mit Streamingtabellen werden im Abfrageverlauf angezeigt. Sie können den Dropdownfilter "Anweisung " verwenden, um einen beliebigen Befehl auszuwählen und die zugehörigen Abfragen zu überprüfen. Auf alle CREATE Anweisungen folgt eine REFRESH Anweisung, die asynchron in einer Delta Live Tables-Pipeline ausgeführt wird. Die REFRESH Anweisungen enthalten in der Regel detaillierte Abfragepläne, die Einblicke in die Optimierung der Leistung bieten.

Führen Sie die folgenden Schritte aus, um auf Anweisungen in der Abfrageverlaufs-UI zuzugreifen REFRESH :

  1. Klicken Sie Verlaufssymbol auf die linke Randleiste, um die Benutzeroberfläche für den Abfrageverlauf zu öffnen.
  2. Aktivieren Sie das Kontrollkästchen REFRESH im Dropdownfilter Anweisung.
  3. Klicken Sie auf den Namen der Abfrage-Anweisung, um Zusammenfassungsdetails wie die Dauer der Abfrage und aggregierte Metriken anzuzeigen.
  4. Klicken Sie auf " Abfrageprofil anzeigen", um das Abfrageprofil zu öffnen. Details zum Navigieren im Abfrageprofil finden Sie im Abfrageprofil .
  5. Optional können Sie die Verknüpfungen im Abschnitt "Abfragequelle " verwenden, um die zugehörige Abfrage oder Pipeline zu öffnen.

Sie können auch mithilfe von Links im SQL-Editor oder aus einem Notizbuch, das an ein SQL Warehouse angefügt ist, auf Abfragedetails zugreifen.

Hinweis

Ihre Streamingtabelle muss für die Ausführung mithilfe des Vorschaukanals konfiguriert werden. Siehe Festlegen des Laufzeitkanals.

Zusätzliche Ressourcen