藉由複製 Hive 中繼存放區管線來建立 Unity 目錄管線
本文說明 Databricks REST API 中的 clone a pipeline
要求,以及如何使用它將發行至 Hive 中繼存放區的現有管線複製到發佈至 Unity 目錄的新管線。 當您呼叫 clone a pipeline
要求時,它會:
- 將原始程式碼和組態從現有的管線複製到新的管線,並套用您指定的任何組態覆寫。
- 更新具現化檢視和流式表的定義和參考,以進行必要的變更,使這些物件能夠由 Unity Catalog 管理。
- 啟動管線更新,以移轉管線中任何串流數據表的現有數據和元數據,例如檢查點。 這可讓這些串流數據表在與原始管線相同的時間點繼續處理。
複製作業完成之後,原始管線和新管線都可以獨立執行。
本文包含直接從 Databricks 筆記本透過 Python 腳本呼叫 API 要求的範例。
開始之前
複製管線之前需要下列事項:
若要複製Hive中繼存放區管線,管線中定義的數據表和檢視表必須將數據表發佈至目標架構。 若要瞭解如何將目標架構新增至管線,請參閱 設定管線以發佈至 Hive 中繼存放區。
在管線中要進行複製的 Hive 中繼存放區管理的數據表或檢視的引用,必須具備完整的目錄(
hive_metastore
)、架構和數據表名稱。 例如,在下列建立customers
數據集的程式代碼中,數據表名稱自變數必須更新為hive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
複製作業進行時,請勿編輯來源 Hive 中繼存放區管線的原始程式碼,包括設定為管線一部分的筆記本,以及儲存在 Git 資料夾或工作區檔案中的任何模組。
當您啟動複製作業時,Hive 來源元資料存儲管線不得執行。 如果更新正在執行,請停止更新,或等候更新完成。
以下是在克隆管線之前的其他重要考慮:
- 如果Hive中繼存放區管線中的數據表使用 Python 中的
path
自變數或 SQL 中的LOCATION
來指定儲存位置,請將"pipelines.migration.ignoreExplicitPath": "true"
組態傳遞至複製要求。 設定此組態包含在下列指示中。 - 如果 Hive 中繼存放區管線包含指定 [
cloudFiles.schemaLocation
] 選項值的自動載入器來源,且在建立 Unity Catalog 複本後 Hive 中繼存放區管線仍能正常運行,您必須在 Hive 中繼存放區管線和複本的 Unity Catalog 管線中將 [mergeSchema
] 選項設定為true
。 在複製之前將此選項新增至 Hive 中繼存放區管線,會將該選項複製到新的管線。
使用 Databricks REST API 複製管線
下列範例會使用 curl
命令在 Databricks REST API 中呼叫 clone a pipeline
要求:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
取代:
-
<personal-access-token>
搭配 Databricks 個人存取權杖。 -
<databricks-instance>
Azure Databricks 工作區實例名稱,例如adb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
具有要複製之 Hive 中繼存放區管線的唯一標識碼。 您可以在 Delta Live Tables UI中找到管線標識碼。
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
取代:
- 在 Unity Catalog 中的某個目錄,即
<target-catalog-name>
,是新管線應發布的對象。 這必須是一個現有的目錄。 -
<target-schema-name>
與目前結構名稱不同的情況下,新管線應該發佈到 Unity Catalog 中的模式名稱。 此參數是選擇性的,如果未指定,則會使用現有的架構名稱。 -
<new-pipeline-name>
具有新管線的選擇性名稱。 如果未指定,則會使用附加[UC]
的來源管線名稱來命名新的管線。
clone_mode
指定要用於複製作業的模式。
MIGRATE_TO_UC
是唯一支持的選項。
使用 [configuration
] 字段來指定新管線的組態。 此處設定的值會覆寫原始管線中的組態。
來自 clone
REST API 要求的回應是新 Unity 目錄管線的管線標識碼。
從 Databricks 筆記本(或 Notebook)中複製一個管線
下列範例會透過 Python 腳本來呼叫 create a pipeline
請求。 您可以使用 Databricks 筆記本來執行此文稿:
- 為該腳本建立一個新筆記本。 請參閱 建立筆記本。
- 將下列 Python 腳本複製到筆記本第一個儲存格。
- 藉由取代下列內容來更新腳本中的佔位符值:
-
<databricks-instance>
Azure Databricks 工作區實例名稱,例如adb-1234567890123456.7.azuredatabricks.net
-
<pipeline-id>
具有要複製之 Hive 中繼存放區管線的唯一標識碼。 您可以在 Delta Live Tables UI中找到管線標識碼。 - 在 Unity Catalog 中的某個目錄,即
<target-catalog-name>
,是新管線應發布的對象。 這必須是一個現有的目錄。 -
<target-schema-name>
與目前結構名稱不同的情況下,新管線應該發佈到 Unity Catalog 中的模式名稱。 此參數是選擇性的,如果未指定,則會使用現有的架構名稱。 -
<new-pipeline-name>
具有新管線的選擇性名稱。 如果未指定,則會使用附加[UC]
的來源管線名稱來命名新的管線。
-
- 執行腳本。 請參閱執行 Databricks 筆記本。
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
局限性
以下是 Delta 即時表 clone a pipeline
API 請求的限制:
- 僅支援從設定為使用Hive中繼存放區到 Unity 目錄管線的管線複製。
- 您只能在與您要複製的管線相同的 Azure Databricks 工作區中建立複製品。
- 您要複製的管線只能包含下列串流來源:
- 變動來源
- 自動載入器,包括自動載入器支援的任何數據源。 請參閱 從雲端物件記憶體載入檔案。
- Apache Kafka 結合結構化串流。 不過,Kafka 來源無法設定為使用
kafka.group.id
選項。 請參閱 使用 Apache Kafka 和 Azure Databricks 進行串流處理。 - Amazon Kinesis 的結構化串流。 不過,Kinesis 來源無法設定為將
consumerMode
設定為efo
。
- 如果您要複製的 Hive 中繼資料庫管線使用了 Auto Loader 檔案通知模式,Databricks 建議您在複製後不要再執行此 Hive 中繼資料庫管線。 這是因為執行 Hive 中繼存放區管線會導致在 Unity Catalog 複製本中捨棄一些檔案通知事件。 如果來源 Hive 中繼存放區管線在複製作業完成之後執行,您可以使用 [自動載入器] 搭配 [
cloudFiles.backfillInterval
] 選項來回填遺漏的檔案。 若要了解自動載入器檔案通知模式,請參閱 什麼是自動載入器檔案通知模式?。 若要瞭解如何使用自動載入器回填檔案,請參閱 使用 cloudFiles.backfillInterval 觸發一般回填,以及 一般自動載入器選項。 - 當複製進行中時,涉及的兩個管線的維護工作會自動暫停。
- 下列適用於針對複製 Unity 目錄管線中的資料表進行時間移動查詢:
- 如果資料表版本最初是寫入到 Hive 中繼資料庫受控物件中,那麼在查詢複製的 Unity Catalog 物件時,使用
timestamp_expression
子句的時間旅行查詢將是未定義的。 - 不過,如果表格版本已寫入到複製的 Unity Catalog 對象,那麼使用
timestamp_expression
子句的時光旅行查詢將能正常運作。 - 使用
version
子句的時間旅行查詢在查詢複製的 Unity Catalog 物件時正常運作,即使該版本最初是寫入由 Hive 中繼資料庫管理的物件,也能正常運作。
- 如果資料表版本最初是寫入到 Hive 中繼資料庫受控物件中,那麼在查詢複製的 Unity Catalog 物件時,使用
- 如需了解將 Delta Live Tables 與 Unity Catalog 搭配使用時的其他限制,請參閱 Unity Catalog 工作流程限制。
- 如需 Unity 目錄限制,請參閱 Unity 目錄限制。