共用方式為


藉由複製 Hive 中繼存放區管線來建立 Unity 目錄管線

本文說明 Databricks REST API 中的 clone a pipeline 要求,以及如何使用它將發行至 Hive 中繼存放區的現有管線複製到發佈至 Unity 目錄的新管線。 當您呼叫 clone a pipeline 要求時,它會:

  • 將原始程式碼和組態從現有的管線複製到新的管線,並套用您指定的任何組態覆寫。
  • 更新具現化檢視和流式表的定義和參考,以進行必要的變更,使這些物件能夠由 Unity Catalog 管理。
  • 啟動管線更新,以移轉管線中任何串流數據表的現有數據和元數據,例如檢查點。 這可讓這些串流數據表在與原始管線相同的時間點繼續處理。

複製作業完成之後,原始管線和新管線都可以獨立執行。

本文包含直接從 Databricks 筆記本透過 Python 腳本呼叫 API 要求的範例。

開始之前

複製管線之前需要下列事項:

  • 若要複製Hive中繼存放區管線,管線中定義的數據表和檢視表必須將數據表發佈至目標架構。 若要瞭解如何將目標架構新增至管線,請參閱 設定管線以發佈至 Hive 中繼存放區

  • 在管線中要進行複製的 Hive 中繼存放區管理的數據表或檢視的引用,必須具備完整的目錄(hive_metastore)、架構和數據表名稱。 例如,在下列建立 customers 數據集的程式代碼中,數據表名稱自變數必須更新為 hive_metastore.sales.customers

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • 複製作業進行時,請勿編輯來源 Hive 中繼存放區管線的原始程式碼,包括設定為管線一部分的筆記本,以及儲存在 Git 資料夾或工作區檔案中的任何模組。

  • 當您啟動複製作業時,Hive 來源元資料存儲管線不得執行。 如果更新正在執行,請停止更新,或等候更新完成。

以下是在克隆管線之前的其他重要考慮:

  • 如果Hive中繼存放區管線中的數據表使用 Python 中的 path 自變數或 SQL 中的 LOCATION 來指定儲存位置,請將 "pipelines.migration.ignoreExplicitPath": "true" 組態傳遞至複製要求。 設定此組態包含在下列指示中。
  • 如果 Hive 中繼存放區管線包含指定 [cloudFiles.schemaLocation] 選項值的自動載入器來源,且在建立 Unity Catalog 複本後 Hive 中繼存放區管線仍能正常運行,您必須在 Hive 中繼存放區管線和複本的 Unity Catalog 管線中將 [mergeSchema] 選項設定為 true。 在複製之前將此選項新增至 Hive 中繼存放區管線,會將該選項複製到新的管線。

使用 Databricks REST API 複製管線

下列範例會使用 curl 命令在 Databricks REST API 中呼叫 clone a pipeline 要求:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

取代:

  • <personal-access-token> 搭配 Databricks 個人存取權杖
  • <databricks-instance> Azure Databricks 工作區實例名稱,例如 adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> 具有要複製之 Hive 中繼存放區管線的唯一標識碼。 您可以在 Delta Live Tables UI中找到管線標識碼。

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

取代:

  • 在 Unity Catalog 中的某個目錄,即 <target-catalog-name>,是新管線應發布的對象。 這必須是一個現有的目錄。
  • <target-schema-name> 與目前結構名稱不同的情況下,新管線應該發佈到 Unity Catalog 中的模式名稱。 此參數是選擇性的,如果未指定,則會使用現有的架構名稱。
  • <new-pipeline-name> 具有新管線的選擇性名稱。 如果未指定,則會使用附加 [UC] 的來源管線名稱來命名新的管線。

clone_mode 指定要用於複製作業的模式。 MIGRATE_TO_UC 是唯一支持的選項。

使用 [configuration] 字段來指定新管線的組態。 此處設定的值會覆寫原始管線中的組態。

來自 clone REST API 要求的回應是新 Unity 目錄管線的管線標識碼。

從 Databricks 筆記本(或 Notebook)中複製一個管線

下列範例會透過 Python 腳本來呼叫 create a pipeline 請求。 您可以使用 Databricks 筆記本來執行此文稿:

  1. 為該腳本建立一個新筆記本。 請參閱 建立筆記本
  2. 將下列 Python 腳本複製到筆記本第一個儲存格。
  3. 藉由取代下列內容來更新腳本中的佔位符值:
    • <databricks-instance> Azure Databricks 工作區實例名稱,例如 adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> 具有要複製之 Hive 中繼存放區管線的唯一標識碼。 您可以在 Delta Live Tables UI中找到管線標識碼。
    • 在 Unity Catalog 中的某個目錄,即 <target-catalog-name>,是新管線應發布的對象。 這必須是一個現有的目錄。
    • <target-schema-name> 與目前結構名稱不同的情況下,新管線應該發佈到 Unity Catalog 中的模式名稱。 此參數是選擇性的,如果未指定,則會使用現有的架構名稱。
    • <new-pipeline-name> 具有新管線的選擇性名稱。 如果未指定,則會使用附加 [UC] 的來源管線名稱來命名新的管線。
  4. 執行腳本。 請參閱執行 Databricks 筆記本
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

局限性

以下是 Delta 即時表 clone a pipeline API 請求的限制:

  • 僅支援從設定為使用Hive中繼存放區到 Unity 目錄管線的管線複製。
  • 您只能在與您要複製的管線相同的 Azure Databricks 工作區中建立複製品。
  • 您要複製的管線只能包含下列串流來源:
  • 如果您要複製的 Hive 中繼資料庫管線使用了 Auto Loader 檔案通知模式,Databricks 建議您在複製後不要再執行此 Hive 中繼資料庫管線。 這是因為執行 Hive 中繼存放區管線會導致在 Unity Catalog 複製本中捨棄一些檔案通知事件。 如果來源 Hive 中繼存放區管線在複製作業完成之後執行,您可以使用 [自動載入器] 搭配 [cloudFiles.backfillInterval] 選項來回填遺漏的檔案。 若要了解自動載入器檔案通知模式,請參閱 什麼是自動載入器檔案通知模式?。 若要瞭解如何使用自動載入器回填檔案,請參閱 使用 cloudFiles.backfillInterval 觸發一般回填,以及 一般自動載入器選項
  • 當複製進行中時,涉及的兩個管線的維護工作會自動暫停。
  • 下列適用於針對複製 Unity 目錄管線中的資料表進行時間移動查詢:
    • 如果資料表版本最初是寫入到 Hive 中繼資料庫受控物件中,那麼在查詢複製的 Unity Catalog 物件時,使用 timestamp_expression 子句的時間旅行查詢將是未定義的。
    • 不過,如果表格版本已寫入到複製的 Unity Catalog 對象,那麼使用 timestamp_expression 子句的時光旅行查詢將能正常運作。
    • 使用 version 子句的時間旅行查詢在查詢複製的 Unity Catalog 物件時正常運作,即使該版本最初是寫入由 Hive 中繼資料庫管理的物件,也能正常運作。
  • 如需了解將 Delta Live Tables 與 Unity Catalog 搭配使用時的其他限制,請參閱 Unity Catalog 工作流程限制
  • 如需 Unity 目錄限制,請參閱 Unity 目錄限制