Vytvořte datový kanál Unity Catalog klonováním Hive metastore pipeline.
Tento článek popisuje požadavek clone a pipeline
v rozhraní Databricks REST API a jak ho můžete použít ke zkopírování existujícího kanálu, který publikuje do metastoru Hive, do nového kanálu, který publikuje do katalogu Unity. Když zpracujete požadavek clone a pipeline
, bude:
- Zkopíruje zdrojový kód a konfiguraci z existujícího kanálu do nového a aplikuje všechny zadané úpravy konfigurace.
- Aktualizuje definice materializovaných zobrazení a streamovaných tabulek a odkazy s požadovanými změnami pro tyto objekty, které mají být spravovány katalogem Unity.
- Spustí aktualizaci potrubí pro migraci existujících dat a metadat, jako jsou kontrolní body, pro všechny streamované tabulky v potrubí. To umožňuje, aby streamovací tabulky pokračovaly ve zpracování ve stejném bodě jako původní zpracovatelská linka.
Po dokončení operace klonování se mohou nezávisle spustit jak původní, tak nové potrubí.
Tento článek obsahuje příklady volání požadavku rozhraní API přímo a prostřednictvím skriptu Pythonu z poznámkového bloku Databricks.
Než začnete
Před klonováním kanálu jsou potřeba následující:
Při klonování datového toku metastoru Hive musí tabulky a zobrazení definovaná v tomto toku publikovat tabulky do cílového schématu. Informace o přidání cílového schématu do kanálu najdete v tématu Konfigurace kanálu pro publikování do metastoru Hive.
Odkazy na spravované tabulky nebo zobrazení metastore Hive v rámci kanálu určeného ke klonování musí být plně kvalifikované pomocí katalogu (
hive_metastore
), schématu a názvu tabulky. Například v následujícím kódu, který vytváří datovou saducustomers
, musí být argument názvu tabulky aktualizován nahive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
V průběhu operace klonování neupravujte zdrojový kód zdrojového kanálu metastoru Hive, včetně poznámkových bloků nakonfigurovaných jako součást kanálu a všech modulů uložených ve složkách Gitu nebo souborech pracovního prostoru.
Zdrojový kanál pro metastore Hive nesmí být spuštěn, když zahajujete operaci klonování. Pokud je aktualizace spuštěná, zastavte ji nebo počkejte na dokončení.
Před klonováním kanálu je potřeba vzít v úvahu následující důležité informace:
- Pokud tabulky v pipeline Hive metastore určují umístění úložiště pomocí argumentu
path
v Pythonu neboLOCATION
v SQL, předejte konfiguraci"pipelines.migration.ignoreExplicitPath": "true"
do požadavku klonu. Nastavení této konfigurace je součástí následujících pokynů. - Pokud kanál metastoru Hive obsahuje AutoLoader jako zdroj, určující hodnotu pro možnost
cloudFiles.schemaLocation
, a kanál metastoru Hive zůstane funkční i po vytvoření klonu katalogu Unity, musíte nastavit možnostmergeSchema
natrue
jak v kanálu metastoru Hive, tak v kanálu klonovaného katalogu Unity. Přidání této možnosti do kanálu metastoru Hive před klonováním zkopíruje možnost do nového kanálu.
Klonování potrubí pomocí rozhraní Databricks REST API
Následující příklad používá příkaz curl
k volání požadavku clone a pipeline
v rozhraní Databricks REST API:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Nahradit:
-
<personal-access-token>
s osobním přístupovým tokenem pro Databricks. -
<databricks-instance>
s názvem instance pracovního prostoru Azure Databricks, napříkladadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
s jedinečným identifikátorem kanálu metastoru Hive, který se má klonovat. ID pipelineu najdete v uživatelském rozhraní DLT.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Nahradit:
-
<target-catalog-name>
s názvem katalogu v Unity Catalog, do kterého by se měl nový pipeline publikovat. Musí se jednat o existující katalog. -
<target-schema-name>
s názvem schématu v katalogu Unity, do kterého se má nový kanál publikovat, pokud se liší od aktuálního názvu schématu. Tento parametr je nepovinný a pokud není zadaný, použije se název existujícího schématu. -
<new-pipeline-name>
s volitelným názvem nového potrubí. Pokud není zadaný, nový kanál se jmenuje pomocí názvu zdrojového kanálu s připojeným[UC]
.
clone_mode
určuje režim, který se má použít pro operaci klonování.
MIGRATE_TO_UC
je jedinou podporovanou možností.
Pomocí pole configuration
definujte konfigurace v novém potrubí. Hodnoty nastavení zde přepisují konfigurace v původním datovém toku.
Odpověď z požadavku rozhraní REST API clone
je ID pipeline nového pipeline Unity Catalogu.
Klonování datového kanálu z notebooku Databricks
Následující příklad volá požadavek create a pipeline
ze skriptu napsaného v Pythonu. Ke spuštění tohoto skriptu můžete použít poznámkový blok Databricks:
- Vytvořte nový poznámkový blok pro skript. Viz Vytvoření poznámkového bloku.
- Zkopírujte následující skript Pythonu do první buňky poznámkového bloku.
- Aktualizujte zástupné hodnoty ve skriptu nahrazením:
-
<databricks-instance>
s názvem instance pracovního prostoru Azure Databricks, napříkladadb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
s jedinečným identifikátorem kanálu metastoru Hive, který se má klonovat. ID pipeline najdete v uživatelském rozhraní DLT . -
<target-catalog-name>
s názvem katalogu v Unity Catalog, do kterého se má nový datový tok publikovat. Musí se jednat o existující katalog. -
<target-schema-name>
s názvem schématu v katalogu Unity, do kterého by se měl nový pipeline publikovat, pokud je odlišný od aktuálního názvu schématu. Tento parametr je nepovinný a pokud není zadaný, použije se název existujícího schématu. -
<new-pipeline-name>
s volitelným názvem nového potrubí. Pokud není zadaný, nový kanál se jmenuje pomocí názvu zdrojového kanálu s připojeným[UC]
.
-
- Spusťte skript. Podívejte se na jak spustit poznámkové bloky Databricks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Omezení
Níže jsou uvedená omezení požadavku rozhraní DLT clone a pipeline
API:
- Podporuje se pouze klonování z kanálu nakonfigurovaného pro použití metastoru Hive do kanálu typu Unity Catalog.
- Klon můžete vytvořit pouze ve stejném pracovním prostoru Azure Databricks jako kanál, ze kterého klonujete.
- Pracovní postup, který klonujete, může obsahovat pouze následující streamovací zdroje:
- Delta zdroje
- Auto Loader, včetně všech zdrojů dat podporovaných Auto Loaderem. Viz Načtení souborů z cloudového úložiště objektů.
- Apached Kafka se strukturovaným streamováním Zdroj Kafka však nelze nakonfigurovat tak, aby používal možnost
kafka.group.id
. Viz zpracování streamů s využitím Apache Kafka a Azure Databricks. - Amazon Kinesis se strukturovaným streamováním Zdroj Kinesis však nelze nakonfigurovat tak, aby nastavil
consumerMode
naefo
.
- Pokud klonujete kanál metastoru Hive, který používá režim oznámení souborů funkce Auto Loader, Databricks doporučuje nespouštět tento kanál metastoru Hive po jeho klonování. Důvodem je to, že spuštění kanálu metastoru Hive způsobí vyřazení některých událostí oznámení o souboru z klonu katalogu Unity. Pokud se zdrojový kanál metastoru Hive po dokončení operace klonování spustí, můžete chybějící soubory doplnit pomocí Auto Loaderu s volbou
cloudFiles.backfillInterval
. Informace o režimu oznámení souboru Auto Loader najdete v tématu Co je režim oznámení souboru Auto Loader?. Další informace o doplňování souborů pomocí Automatického Zavaděče najdete v tématu Aktivace běžných backfillů pomocí cloudFiles.backfillInterval a Běžné možnosti Automatického Zavaděče. - Úlohy údržby kanálu se během klonování automaticky pozastaví pro oba kanály.
- Následující platí pro dotazy týkající se časového cestování tabulek v naklonovaném pipeline Unity Catalogu:
- Pokud byla verze tabulky původně zapsána do spravovaného objektu metastoru Hive, dotazy na čas cestování pomocí klauzule
timestamp_expression
nejsou při dotazování klonovaného objektu katalogu Unity definovány. - Pokud se však verze tabulky zapíše do klonovaného objektu katalogu Unity, správně fungují dotazy na časovou cestu pomocí klauzule
timestamp_expression
. - Dotazy na časovou cestu pomocí klauzule
version
fungují správně při dotazování na klonovaný objekt katalogu Unity, i když byla verze původně zapsána do spravovaného objektu metastoru Hive.
- Pokud byla verze tabulky původně zapsána do spravovaného objektu metastoru Hive, dotazy na čas cestování pomocí klauzule
- Další omezení při použití DLT s Unity Catalog najdete v části omezení kanálu Unity Catalog.
- Více informací o omezeních katalogu Unity najdete v části omezení katalogu Unity.