Erstellen einer Unity-Katalogpipeline durch Klonen einer Hive-Metastore-Pipeline
Dieser Artikel beschreibt die clone a pipeline
Anforderung in der Databricks-REST-API und wie Sie sie verwenden können, um eine vorhandene Pipeline zu kopieren, die im Hive-Metaspeicher veröffentlicht wird, in eine neue Pipeline, die im Unity-Katalog veröffentlicht wird. Wenn Sie die clone a pipeline
-Anforderung aufrufen, passiert Folgendes:
- Kopiert den Quellcode und die Konfiguration aus der vorhandenen Pipeline in eine neue, wobei alle von Ihnen angegebenen Konfigurationsüberschreibungen angewendet werden.
- Aktualisiert Definitionen und Verweise von materialisierten Sichten und Streamingtabellen mit den erforderlichen Änderungen für diese Objekte, die von Unity Catalog verwaltet werden sollen.
- Startet eine Pipelineaktualisierung zum Migrieren der vorhandenen Daten und Metadaten, z. B. Prüfpunkte, für alle Streamingtabellen in der Pipeline. Auf diese Weise können diese Streamingtabellen die Verarbeitung am selben Punkt wie die ursprüngliche Pipeline fortsetzen.
Nach Abschluss des Klonvorgangs können sowohl die ursprünglichen als auch die neuen Pipelines unabhängig laufen.
Dieser Artikel enthält Beispiele für das direkte Aufrufen der API-Anforderung und über ein Python-Skript aus einem Databricks-Notizbuch.
Voraussetzungen
Die folgenden Schritte sind vor dem Klonen einer Pipeline erforderlich:
Um eine Hive-Metastore-Pipeline zu klonen, müssen die in der Pipeline definierten Tabellen und Sichten Tabellen in einem Zielschema veröffentlichen. Informationen zum Hinzufügen eines Zielschemas zu einer Pipeline finden Sie unter Konfigurieren einer Pipeline zum Veröffentlichen in Hive Metastore.
Verweise auf im Hive-Metastore verwaltete Tabellen oder Ansichten in der zu klonenden Pipeline müssen vollständig mit dem Katalognamen (
hive_metastore
), dem Schema und dem Tabellennamen qualifiziert sein. Im folgenden Code zum Erstellen einescustomers
-Datasets muss das Tabellennamenargument beispielsweise aufhive_metastore.sales.customers
aktualisiert werden:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
Bearbeiten Sie nicht den Quellcode für die Quell-Hive-Metaspeicherpipeline, während ein Klonvorgang ausgeführt wird, einschließlich der als Teil der Pipeline konfigurierten Notizbücher und der in Git-Ordnern oder Arbeitsbereichsdateien gespeicherten Module.
Die Hive-Metastore-Quellpipeline darf nicht ausgeführt werden, wenn Sie den Klonvorgang starten. Wenn ein Update ausgeführt wird, beenden Sie es, oder warten Sie, bis es abgeschlossen ist.
Im Folgenden finden Sie weitere wichtige Überlegungen vor dem Klonen einer Pipeline:
- Wenn Tabellen in der Hive-Metastore-Pipeline einen Speicherort mithilfe des arguments
path
in Python oderLOCATION
in SQL angeben, übergeben Sie die"pipelines.migration.ignoreExplicitPath": "true"
Konfiguration an die Klonanforderung. Das Festlegen dieser Konfiguration ist in den nachstehenden Anweisungen enthalten. - Wenn die Hive-Metastorepipeline eine Auto Loader-Quelle enthält, die einen Wert für die Option
cloudFiles.schemaLocation
angibt, und die Hive-Metastore-Pipeline bleibt nach dem Erstellen des Unity-Katalogklons betriebsbereit, müssen Sie die OptionmergeSchema
auftrue
sowohl in der Hive-Metastore-Pipeline als auch in der geklonten Unity-Katalogpipeline festlegen. Durch Hinzufügen dieser Option zur Hive-Metastore-Pipeline vor dem Klonen wird die Option in die neue Pipeline kopiert.
Klonen einer Pipeline mit der Databricks-REST-API
Im folgenden Beispiel wird der Befehl curl
verwendet, um die clone a pipeline
Anforderung in der Databricks-REST-API aufzurufen:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Ersetzen Sie Folgendes:
<personal-access-token>
mit einem persönlichen Zugriffstoken für Databricks.<databricks-instance>
mit dem Namen der Azure Databricks-Arbeitsbereichsinstanz, zum Beispieladb-1234567890123456.7.azuredatabricks.net
<pipeline-id>
mit dem eindeutigen Bezeichner der Hive-Metastore-Pipeline, die geklont werden soll. Sie finden die Pipeline-ID in der Delta Live Tables UI.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Ersetzen Sie Folgendes:
<target-catalog-name>
mit dem Namen eines Katalogs in Unity Catalog, in dem die neue Pipeline veröffentlicht werden soll. Dies muss ein vorhandener Katalog sein.<target-schema-name>
mit dem Namen eines Schemas im Unity-Katalog, in dem die neue Pipeline veröffentlicht werden soll, wenn sie sich vom aktuellen Schemanamen unterscheidet. Dieser Parameter ist optional, und wenn nicht angegeben, wird der vorhandene Schemaname verwendet.<new-pipeline-name>
mit einem optionalen Namen für die neue Pipeline. Wenn nicht angegeben, wird die neue Pipeline mit dem Namen der Quellpipeline benannt, wobei[UC]
angefügt wurde.
clone_mode
gibt den Modus an, der für den Klonvorgang verwendet werden soll. MIGRATE_TO_UC
ist die einzige unterstützte Option.
Verwenden Sie das Feld configuration
, um Konfigurationen für die neue Pipeline anzugeben. Die hier festgelegten Werte überschreiben Konfigurationen in der ursprünglichen Pipeline.
Die Antwort der clone
REST-API-Anforderung ist die Pipeline-ID der neuen Unity-Katalogpipeline.
Klonen einer Pipeline aus einem Databricks-Notizbuch
Im folgenden Beispiel wird die create a pipeline
Anforderung aus einem Python-Skript aufgerufen. Sie können ein Databricks-Notizbuch verwenden, um dieses Skript auszuführen:
- Erstellen Sie ein neues Notizbuch für das Skript. Weitere Informationen finden Sie unter Erstellen eines Notebooks.
- Kopieren Sie das folgende Python-Skript in die erste Zelle des Notizbuchs.
- Aktualisieren Sie die Platzhalterwerte im Skript, indem Sie Folgendes ersetzen:
<databricks-instance>
mit dem Namen der Azure Databricks-Arbeitsbereichsinstanz, z. B.adb-1234567890123456.7.azuredatabricks.net
<pipeline-id>
mit dem eindeutigen Bezeichner der Hive-Metastore-Pipeline, die geklont werden soll. Sie finden die Pipeline-ID in der Delta Live Tables UI.<target-catalog-name>
mit dem Namen eines Katalogs im Unity-Katalog, in dem die neue Pipeline veröffentlicht werden soll. Dies muss ein vorhandener Katalog sein.<target-schema-name>
mit dem Namen eines Schemas im Unity-Katalog, in das die neue Pipeline veröffentlicht werden soll, falls es sich vom aktuellen Schemanamen unterscheidet. Dieser Parameter ist optional, und wenn nicht angegeben, wird der vorhandene Schemaname verwendet.<new-pipeline-name>
mit einem optionalen Namen für die neue Pipeline. Wenn nicht angegeben, wird die neue Pipeline mit dem Namen der Quellpipeline benannt, wobei[UC]
angefügt wurde.
- Führen Sie das Skript aus. Weitere Informationen finden Sie unter Ausführen von Databricks-Notebooks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Einschränkungen
Es folgen Einschränkungen der Delta Live Tables clone a pipeline
API-Anforderung:
- Nur das Klonen einer Pipeline, die zur Verwendung mit dem Hive-Metastore konfiguriert ist, in eine Unity Catalog-Pipeline wird unterstützt.
- Sie können einen Klon nur im gleichen Azure Databricks-Arbeitsbereich wie die Pipeline erstellen, aus der Sie klonen.
- Die Pipeline, die Sie klonen, kann nur die folgenden Streamingquellen enthalten:
- Deltaquellen
- Auto Loader, einschließlich aller Datenquellen, die vom automatischen Laden unterstützt werden. Siehe Laden von Dateien aus dem Cloudobjektspeicher.
- Apached Kafka mit strukturiertem Streaming. Die Kafka-Quelle kann jedoch nicht für die Verwendung der option
kafka.group.id
konfiguriert werden. Siehe Stream-Verarbeitung mit Apache Kafka und Azure Databricks. - Amazon Kinesis mit strukturiertem Streaming. Die Kinesis-Quelle kann jedoch nicht so konfiguriert werden, dass
consumerMode
aufefo
festgelegt wird.
- Wenn die von Ihnen geklonte Hive-Metastore-Pipeline den Benachrichtigungsmodus zum automatischen Laden von Dateien verwendet, empfiehlt Databricks, die Hive-Metastore-Pipeline nicht auszuführen, nachdem sie geklont wurde. Dies liegt daran, dass das Ausführen der Hive-Metastore-Pipeline dazu führt, dass einige Dateibenachrichtigungsereignisse aus dem Unity Catalog-Klon verloren gehen. Wenn die Hive Metastore-Quellpipeline ausgeführt wird, nachdem der Klonvorgang abgeschlossen ist, können Sie fehlende Dateien mit Auto Loader und der Option
cloudFiles.backfillInterval
auffüllen. Informationen zum Benachrichtigungsmodus der Auto Loader-Datei finden Sie unter Was ist der Benachrichtigungsmodus für Auto Loader-Dateien?. Informationen zum Backfilling von Dateien mit Auto Loader finden Sie unter Auslösen regulärer Backfills mit cloudFiles.backfillInterval und Allgemeinen Optionen für das automatische Laden. - Pipelinewartungsaufgaben werden für beide Pipelines automatisch angehalten, während das Klonen durchgeführt wird.
- Die folgenden Punkte gelten für Zeitreiseabfragen für Tabellen in der geklonten Unity-Katalogpipeline:
- Wenn eine Tabellenversion ursprünglich in ein verwaltetes Hive-Metastore-Objekt geschrieben wurde, werden Zeitreiseabfragen mit einer
timestamp_expression
Klausel beim Abfragen des geklonten Unity Catalog-Objekts nicht definiert. - Wenn die Tabellenversion jedoch in das geklonte Unity Catalog-Objekt geschrieben wurde, funktionieren Zeitreiseabfragen mit einer
timestamp_expression
Klausel ordnungsgemäß. - Zeitreiseabfragen, die eine
version
Klausel verwenden, funktionieren ordnungsgemäß, wenn ein geklontes Unity Catalog-Objekt abgefragt wird, auch wenn die Version ursprünglich in das verwaltete Objekt des Hive-Metastores geschrieben wurde.
- Wenn eine Tabellenversion ursprünglich in ein verwaltetes Hive-Metastore-Objekt geschrieben wurde, werden Zeitreiseabfragen mit einer
- Weitere Einschränkungen bei der Verwendung von Delta Live Tables mit Unity Catalog finden Sie unter Einschränkungen der Unity Catalog-Pipeline.
- Einschränkungen des Unity-Katalogs finden Sie unter Unity-Katalogeinschränkungen.