Een Unity Catalog-pijplijn maken door een Hive-metastore-pijplijn te klonen
In dit artikel wordt de clone a pipeline
aanvraag in de Databricks REST API beschreven en hoe u deze kunt gebruiken om een bestaande pijplijn te kopiƫren die naar de Hive-metastore wordt gepubliceerd naar een nieuwe pijplijn die naar Unity Catalog wordt gepubliceerd. Wanneer u de clone a pipeline
-aanvraag aanroept, gebeurt het volgende:
- Kopieert de broncode en configuratie van de bestaande pijplijn naar een nieuwe, waarbij eventuele configuratie-onderdrukkingen worden toegepast die u hebt opgegeven.
- Gerealiseerde weergave- en streamingtabeldefinities en -verwijzingen worden bijgewerkt met de vereiste wijzigingen zodat deze objecten beheerd kunnen worden door Unity Catalog.
- Hiermee start u een update van de pijplijn om bestaande gegevens en metagegevens, zoals controlepunten, te migreren voor alle streamingtabellen in de pijplijn. Hierdoor kunnen deze streamingtabellen de verwerking hervatten op hetzelfde punt als de oorspronkelijke pijplijn.
Nadat de kloonbewerking is voltooid, kunnen zowel de oorspronkelijke als de nieuwe pijplijnen onafhankelijk worden uitgevoerd.
Dit artikel bevat voorbeelden van het rechtstreeks aanroepen van de API-aanvraag en via een Python-script vanuit een Databricks-notebook.
Voordat u begint
Het volgende is vereist voordat u een pijplijn kloont:
Als u een Hive-metastore-pijplijn wilt klonen, moeten de tabellen en weergaven die in de pijplijn zijn gedefinieerd, tabellen publiceren naar een doelschema. Zie Een pijplijn configureren om te publiceren naar Hive-metastorevoor meer informatie over het toevoegen van een doelschema aan een pijplijn.
Verwijzingen naar door Hive metastore beheerde tabellen of weergaven in de pijplijn die moeten worden gekloond, moeten volledig zijn gekwalificeerd met de catalogus (
hive_metastore
), schema en tabelnaam. In de volgende code die bijvoorbeeld eencustomers
gegevensset maakt, moet het argument tabelnaam worden bijgewerkt naarhive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
Bewerk de broncode voor de Hive-metastore-bronpijplijn niet terwijl er een kloonbewerking wordt uitgevoerd, inclusief notebooks die zijn geconfigureerd als onderdeel van de pijplijn en modules die zijn opgeslagen in Git-mappen of werkruimtebestanden.
De Hive metastore bronpijplijn mag niet draaien wanneer u de kloonbewerking start. Als een update wordt uitgevoerd, stopt u deze of wacht u totdat deze is voltooid.
Hier volgen andere belangrijke overwegingen voordat u een pijplijn kloont:
- Als tabellen in de Hive-metastore-pijplijn een opslaglocatie opgeven met behulp van het argument
path
in Python ofLOCATION
in SQL, geeft u de"pipelines.migration.ignoreExplicitPath": "true"
-configuratie door aan de kloonaanvraag. Het instellen van deze configuratie vindt u in de onderstaande instructies. - Als de Hive-metastore-pijplijn een Bron voor automatisch laden bevat waarmee een waarde voor de
cloudFiles.schemaLocation
optie wordt opgegeven en de Hive-metastore-pijplijn operationeel blijft na het maken van de Unity Catalog-kloon, moet u de optiemergeSchema
instellen optrue
in zowel de Hive-metastore-pijplijn als de gekloonde Unity Catalog-pijplijn. Als u deze optie toevoegt aan de Hive-metastore-pijplijn voordat u kloont, wordt de optie naar de nieuwe pijplijn gekopieerd.
Een pijplijn klonen met de Databricks REST API
In het volgende voorbeeld wordt de opdracht curl
gebruikt om de clone a pipeline
aanvraag aan te roepen in de Databricks REST API:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Vervangen:
-
<personal-access-token>
met een Databricks--persoonlijk toegangstoken. -
<databricks-instance>
met de naam van het Azure Databricks-werkruimte-exemplaar, bijvoorbeeldadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
met de unieke id van de Hive-metastore-pijplijn die moet worden gekloond. U vindt de pijplijn-id in de gebruikersinterface van Delta Live Tables.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Vervangen:
-
<target-catalog-name>
met de naam van een catalogus in Unity Catalog waarnaar de nieuwe pijplijn gepubliceerd moet worden. Dit moet een bestaande catalogus zijn. -
<target-schema-name>
met de naam van een schema in Unity Catalog waarnaar de nieuwe pijplijn moet publiceren als deze anders is dan de huidige schemanaam. Deze parameter is optioneel en indien niet opgegeven, wordt de bestaande schemanaam gebruikt. -
<new-pipeline-name>
met een optionele naam voor de nieuwe pijplijn. Als er geen specifieke naam is opgegeven, krijgt de nieuwe pijplijn de naam van de bronpijplijn met de toevoeging[UC]
.
clone_mode
geeft de modus op die moet worden gebruikt voor de kloonbewerking.
MIGRATE_TO_UC
is de enige ondersteunde optie.
Gebruik het veld configuration
om configuraties op te geven voor de nieuwe pijplijn. De waarden die hier worden ingesteld, overschrijven configuraties in de oorspronkelijke pijplijn.
Het antwoord van de clone
REST API-aanvraag is de pijplijn-id van de nieuwe Unity Catalog-pijplijn.
Een pijplijn klonen vanuit een Databricks-notebook
In het volgende voorbeeld wordt de create a pipeline
-aanvraag aanroepen vanuit een Python-script. U kunt een Databricks-notebook gebruiken om dit script uit te voeren:
- Maak een nieuw notebook voor het script. Zie Een notitieblok maken.
- Kopieer het volgende Python-script naar de eerste cel van het notebook.
- Werk de waarden van de tijdelijke aanduidingen in het script bij door het volgende te vervangen:
-
<databricks-instance>
met de naam van het Azure Databricks-werkruimte-exemplaar, bijvoorbeeldadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
met de unieke id van de Hive-metastore-pijplijn die moet worden gekloond. U vindt de pijplijn-id in de gebruikersinterface van Delta Live Tables. -
<target-catalog-name>
met de naam van een catalogus in Unity Catalog waarnaar de nieuwe pijplijn gepubliceerd moet worden. Dit moet een bestaande catalogus zijn. -
<target-schema-name>
met de naam van een schema in Unity Catalog waarnaar de nieuwe pijplijn moet publiceren als deze anders is dan de huidige schemanaam. Deze parameter is optioneel en indien niet opgegeven, wordt de bestaande schemanaam gebruikt. -
<new-pipeline-name>
met een optionele naam voor de nieuwe pijplijn. Als er geen specifieke naam is opgegeven, krijgt de nieuwe pijplijn de naam van de bronpijplijn met de toevoeging[UC]
.
-
- Voer het script uit. Zie Voer Databricks-notebooks uit.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Beperkingen
Hier volgen beperkingen van de Delta Live Tables clone a pipeline
API-aanvraag:
- Alleen klonen vanuit een pijplijn die is geconfigureerd voor het gebruik van de Hive-metastore naar een Unity Catalog-pijplijn, wordt ondersteund.
- U kunt alleen een kloon maken in dezelfde Azure Databricks-werkruimte als de pijplijn waaruit u kloont.
- De pijplijn die u kloont, kan alleen de volgende streamingbronnen bevatten:
- Delta-bronnen
- Automatisch laden, inclusief alle gegevensbronnen die worden ondersteund door Auto Loader. Zie Bestanden laden vanuit de opslag van cloudobjecten.
- Apached Kafka met Structured Streaming. De Kafka-bron kan echter niet worden geconfigureerd voor het gebruik van de optie
kafka.group.id
. Zie Stream-verwerking met Apache Kafka en Azure Databricks. - Amazon Kinesis met Gestructureerde Streaming. De Kinesis-bron kan echter niet worden geconfigureerd om
consumerMode
in te stellen opefo
.
- Als de Hive-metastore-pijplijn die u kopieert gebruikmaakt van de meldingsmodus voor het automatisch laden van bestanden, raadt Databricks aan om de Hive-metastore-pijplijn niet uit te voeren na het klonen. Dit komt doordat het uitvoeren van de Hive-metastore-pijplijn leidt tot het wegvallen van enkele bestandseventmeldingen van de Unity Catalog-kloon. Als de Hive-metastore-bronpijplijn wordt uitgevoerd nadat de kloonbewerking is voltooid, kunt u ontbrekende bestanden aanvullen met behulp van Auto Loader met de optie
cloudFiles.backfillInterval
. Leer meer over de meldingsmodus voor Auto Loader-bestanden in Wat is de meldingsmodus voor Auto Loader-bestanden?. Kijk naar Reguliere backfills activeren met cloudFiles.backfillInterval en Algemene Auto Loader-optiesvoor meer informatie over het backfillen van bestanden met Auto Loader. - Onderhoudstaken voor pijplijnen worden automatisch onderbroken voor beide pijplijnen terwijl het klonen wordt uitgevoerd.
- Het volgende is van toepassing op tijdreisquery's op tabellen in de gekloonde Unity Catalog-pijplijn:
- Als een tabelversie oorspronkelijk is geschreven naar een beheerd Hive-metastore-object, zijn tijdreisquery's met behulp van een
timestamp_expression
-component niet gedefinieerd bij het uitvoeren van een query op het gekloonde Unity Catalog-object. - Als de tabelversie echter naar het gekloonde Unity Catalog-object is geschreven, werken tijdreisquery's met behulp van een
timestamp_expression
-component correct. - Query's voor tijdreizen met behulp van een
version
-component werken correct bij het uitvoeren van query's op een gekloond Unity Catalog-object, zelfs wanneer de versie oorspronkelijk naar het beheerde Hive-metastore-object is geschreven.
- Als een tabelversie oorspronkelijk is geschreven naar een beheerd Hive-metastore-object, zijn tijdreisquery's met behulp van een
- Zie Beperkingen van de Unity Catalog-pijplijn voor andere beperkingen bij het gebruik van Delta Live Tables met Unity Catalog.
- Zie Beperkingen voor Unity Catalogvoor beperkingen voor Unity Catalog.