Ausführen einer Delta Live Tables-Pipeline in einem Workflow
Sie können eine Delta Live Tables-Pipeline als Teil eines Datenverarbeitungsworkflows mit Databricks-Aufträgen, Apache Airflow oder Azure Data Factory ausführen.
Aufträge
Sie können mehrere Aufgaben in einem Databricks-Auftrag organisieren, um einen Datenverarbeitungsworkflow zu implementieren. Um eine Delta Live Tables-Pipeline in einem Auftrag zu nutzen, verwenden Sie die Pipeline-Aufgabe, wenn Sie einen Auftrag erstellen. Siehe Delta Live Tables-Pipeline-Aufgabe für Jobs.
Apache Airflow
Apache Airflow ist eine Open-Source-Lösung zum Verwalten und Planen von Datenworkflows. Airflow stellt Datenpipelines als gerichtete, azyklische Graphen (directed acyclic graphs, DAGs) von Vorgängen dar. Sie definieren einen Workflow in einer Python-Datei, und Airflow verwaltet die Planung und Ausführung. Informationen zum Installieren und Verwenden von Airflow bei Azure Databricks finden Sie unter Orchestrieren von Azure Databricks-Aufträgen mit Apache Airflow.
Verwenden Sie den DatabricksSubmitRunOperator, um eine Delta Live Tables-Pipeline als Teil eines Airflow-Workflows auszuführen.
Anforderungen
Folgendes ist erforderlich, um die Airflow-Unterstützung für Delta Live Tables verwenden zu können:
- Airflow Version 2.1.0 oder höher
- Das Databricks-Anbieterpaket Version 2.1.0 oder höher
Beispiel
Im folgenden Beispiel wird ein Airflow-DAG erstellt, der ein Update für die Delta Live Tables-Pipeline mit dem Bezeichner 8279d543-063c-4d63-9926-dae38e35ce8b
auslöst:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Ersetzen Sie CONNECTION_ID
durch den Bezeichner für eine Airflow-Verbindung mit Ihrem Arbeitsbereich.
Speichern Sie dieses Beispiel im airflow/dags
-Verzeichnis, und verwenden Sie die Airflow-Benutzeroberfläche, um den DAG anzuzeigen und auszulösen. Verwenden Sie die Delta Live Tables-Benutzeroberfläche, um die Details des Pipelineupdates anzuzeigen.
Azure Data Factory
Azure Data Factory ist ein cloudbasierter ETL-Dienst, mit dem Sie Datenintegrations- und Transformationsworkflows organisieren können. Azure Data Factory unterstützt das Ausführen von Azure Databricks-Aufgaben in einem Workflow direkt, unter anderem in Notebooks, JAR-Aufgaben und Python-Skripts. Sie können eine Pipeline auch in einem Workflow nutzen, indem Sie die API von Delta Live Tables aus einer Web-Aktivität von Azure Data Factory aufrufen. So lösen Sie beispielsweise ein Pipelineupdate von Azure Data Factory aus:
Erstellen Sie eine Data Factory, oder öffnen Sie eine vorhandene Data Factory.
Wenn die Erstellung abgeschlossen ist, öffnen Sie die Seite Ihrer Data Factory, und klicken Sie auf die Kachel Azure Data Factory Studio öffnen. Die Azure Data Factory-Benutzeroberfläche wird angezeigt.
Erstellen Sie eine neue Azure Data Factory-Pipeline, indem Sie auf der Benutzeroberfläche von Azure Data Factory Studio in der Dropdownliste Neu die Option Pipeline auswählen.
Erweitern Sie in der Toolbox Aktivitäten die Option Allgemein, und ziehen Sie die Web-Aktivität auf die Canvas der Pipeline. Klicken Sie auf die Registerkarte Einstellungen, und geben Sie die folgenden Werte ein:
Hinweis
Als bewährte Methode für die Sicherheit empfiehlt Databricks, dass Sie bei der Authentifizierung mit automatisierten Tools, Systemen, Skripten und Anwendungen persönliche Zugriffstoken verwenden, die zu Dienstprinzipalen und nicht zu Benutzern des Arbeitsbereichs gehören. Informationen zum Erstellen von Token für Dienstprinzipale finden Sie unter Verwalten von Token für einen Dienstprinzipal.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Ersetzen Sie
<get-workspace-instance>
.Ersetzen Sie
<pipeline-id>
durch den Pipelinebezeichner.Methode: Wählen Sie POST aus dem Dropdownmenü aus.
Header: Klicken Sie auf + Neu. Geben Sie
Authorization
in das Textfeld Name ein. Geben SieBearer <personal-access-token>
in das Textfeld Wert ein.Ersetzen Sie
<personal-access-token>
durch ein persönliches Azure Databricks-Zugriffstoken.Text: Geben Sie ein JSON-Dokument mit den Parametern ein, um zusätzliche Anforderungsparameter zu übergeben. Um beispielsweise ein Update zu starten und alle Daten für die Pipeline erneut zu verarbeiten, nutzen Sie
{"full_refresh": "true"}
. Wenn keine zusätzlichen Anforderungsparameter vorhanden sind, geben Sie leere geschweifte Klammern ein ({}
).
Um die Web-Aktivität zu testen, klicken Sie auf der Pipelinesymbolleiste auf der Data Factory-Benutzeroberfläche auf Debuggen. Die Ausgabe und der Status der Ausführung, einschließlich Fehler, werden auf der Registerkarte Ausgabe der Azure Data Factory-Pipeline angezeigt. Verwenden Sie die Delta Live Tables-Benutzeroberfläche, um die Details des Pipelineupdates anzuzeigen.
Tipp
Eine häufige Workflowanforderung ist das Starten einer Aufgabe nach Abschluss einer vorherigen Aufgabe. Da die Delta Live Tables-Anforderung updates
asynchron ist, d. h. die Anforderung nach dem Starten des Updates, jedoch vor Abschluss zurückgegeben wird, müssen Aufgaben in Ihrer Azure Data Factory-Pipeline mit einer Abhängigkeit vom Delta Live Tables-Update auf den Abschluss des Updates warten. Eine Option beim Warten auf den Abschluss des Updates ist das Hinzufügen einer Until-Aktivität nach der Web-Aktivität, die das Delta Live Tables-Update auslöst. Führen Sie Folgendes in der Until-Aktivität durch:
- Fügen Sie eine Wait-Aktivität hinzu, um eine konfigurierte Anzahl von Sekunden auf den Abschluss des Updates zu warten.
- Fügen Sie eine Web-Aktivität nach der Wait-Aktivität hinzu, die die Anforderung Updatedetails abrufen für Delta Live Tables verwendet, um den Status des Updates abzurufen. Das Feld
state
in der Antwort gibt den aktuellen Status des Updates zurück, unter anderem, ob es abgeschlossen ist. - Verwenden Sie den Wert des Felds
state
, um die abschließende Bedingung der Until-Aktivität festzulegen. Sie können auch eine Set Variable-Aktivität verwenden, um eine Pipelinevariable basierend auf demstate
-Wert hinzuzufügen und diese Variable für die abschließende Bedingung zu nutzen.