แชร์ผ่าน


Create a Unity Catalog pipeline by cloning a Hive metastore pipeline

Important

The clone a pipeline request in the Delta Live Tables Rest API is in Public Preview.

This article describes the clone a pipeline request in the Databricks Rest API and how you can use it to copy an existing pipeline that publishes to the Hive metastore to a new pipeline that publishes to Unity Catalog. When you call the clone a pipeline request, it:

  • Copies the source code and configuration from the existing pipeline to a new one, applying any configuration overrides you’ve specified.
  • Updates Materialized view and Streaming table definitions and references with the required changes for those objects to be managed by Unity Catalog.
  • Starts a pipeline update to migrate the existing data and metadata, such as checkpoints, for any Streaming tables in the pipeline. This allows those Streaming tables to resume processing at the same point as the original pipeline.

After the clone operation is complete, both the original and new pipelines can run independently.

This article includes examples of calling the API request directly and through a Python script from a Databricks notebook.

Before you begin

The following are required before cloning a pipeline:

  • To clone a Hive metastore pipeline, the tables and views defined in the pipeline must publish tables to a target schema. To learn how to add a target schema to a pipeline, see How to publish Delta Live Tables datasets to the legacy Hive metastore.

  • References to Hive metastore managed tables or views in the pipeline to clone must be fully qualified with the catalog (hive_metastore), schema, and table name. For example, in the following code creating a customers dataset, the table name argument must be updated to hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Do not edit the source code for the source Hive metastore pipeline while a clone operation is in progress, including notebooks configured as part of the pipeline and any modules stored in Git folders or workspace files.

  • The source Hive metastore pipeline must not be running when you start the clone operation. If an update is running, stop it or wait for it to complete.

The following are other important considerations before cloning a pipeline:

  • If tables in the Hive metastore pipeline specify a storage location using the path argument in Python or LOCATION in SQL, pass the "pipelines.migration.ignoreExplicitPath": "true" configuration to the clone request. Setting this configuration is included in the instructions below.
  • If the Hive metastore pipeline includes an Auto Loader source that specifies a value for the cloudFiles.schemaLocation option, and the Hive metastore pipeline will remain operational after creating the Unity Catalog clone, you must set the mergeSchema option to true in both the Hive metastore pipeline and the cloned Unity Catalog pipeline. Adding this option to the Hive metastore pipeline before cloning will copy the option to the new pipeline.

Clone a pipeline with the Databricks REST API

The following example uses the curl command to call the clone a pipeline request in the Databricks Rest API:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Replace:

  • <personal-access-token> with a Databricks personal access token.
  • <databricks-instance> with the Azure Databricks workspace instance name, for example adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> with the unique identifier of the Hive metastore pipeline to clone. You can find the pipeline ID in the Delta Live Tables UI.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Replace:

  • <target-catalog-name> with the name of a catalog in Unity Catalog to which the new pipeline should publish. This must be an existing catalog.
  • <target-schema-name> with the name of a schema in Unity Catalog to which the new pipeline should publish if it’s different than the current schema name. This parameter is optional, and if not specified, the existing schema name is used.
  • <new-pipeline-name> with an optional name for the new pipeline. If not specified, the new pipeline is named using the source pipeline name with [UC] appended.

clone_mode specifies the mode to use for the clone operation. MIGRATE_TO_UC is the only supported option.

Use the configuration field to specify configurations on the new pipeline. The values set here override configurations in the original pipeline.

The response from the clone REST API request is the pipeline ID of the new Unity Catalog pipeline.

Clone a pipeline from a Databricks notebook

The following example calls the create a pipeline request from a Python script. You can use a Databricks notebook to run this script:

  1. Create a new notebook for the script. See Create a notebook.
  2. Copy the following Python script into the first cell of the notebook.
  3. Update the placeholder values in the script by replacing:
    • <databricks-instance> with the Azure Databricks workspace instance name, for example adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> with the unique identifier of the Hive metastore pipeline to clone. You can find the pipeline ID in the Delta Live Tables UI.
    • <target-catalog-name> with the name of a catalog in Unity Catalog to which the new pipeline should publish. This must be an existing catalog.
    • <target-schema-name> with the name of a schema in Unity Catalog to which the new pipeline should publish if it’s different than the current schema name. This parameter is optional, and if not specified, the existing schema name is used.
    • <new-pipeline-name> with an optional name for the new pipeline. If not specified, the new pipeline is named using the source pipeline name with [UC] appended.
  4. Run the script. See Run Databricks notebooks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode in this preview
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitations

The following are limitations of the Delta Live Tables clone a pipeline API request:

  • Only cloning from a pipeline configured to use the Hive metastore to a Unity Catalog pipeline is supported.
  • You can create a clone only in the same Azure Databricks workspace as the pipeline you’re cloning from.
  • The pipeline you’re cloning can include only the following streaming sources:
  • If the Hive metastore pipeline you’re cloning uses Auto Loader file notification mode, Databricks recommends not running the Hive metastore pipeline after cloning. This is because running the Hive metastore pipeline results in dropping some file notification events from the Unity Catalog clone. If the source Hive metastore pipeline does run after the clone operation completes, you can backfill missing files using Auto Loader with the cloudFiles.backfillInterval option. To learn about Auto Loader file notification mode, see What is Auto Loader file notification mode?. To learn about backfilling files with Auto Loader, see Trigger regular backfills using cloudFiles.backfillInterval and Common Auto Loader options.
  • Pipeline maintenance tasks are automatically paused for both pipelines while cloning is in progress.
  • The following applies to time travel queries against tables in the cloned Unity Catalog pipeline:
    • If a table version was originally written to a Hive metastore managed object, time travel queries using a timestamp_expression clause are undefined when querying the cloned Unity Catalog object.
    • However, if the table version was written to the cloned Unity Catalog object, time travel queries using a timestamp_expression clause work correctly.
    • Time travel queries using a version clause work correctly when querying a cloned Unity Catalog object, even when the version was originally written to the Hive metastore managed object.
  • For other limitations when using Delta Live Tables with Unity Catalog, see Unity Catalog pipeline limitations.
  • For Unity Catalog limitations, see Unity Catalog limitations.