Condividi tramite


Creare una pipeline di Unity Catalog clonando una pipeline del metastore Hive

Questo articolo descrive la richiesta di clone a pipeline nell'API REST di Databricks e come usarla per copiare una pipeline esistente che pubblica nel metastore Hive in una nuova pipeline che pubblica in Unity Catalog. Quando si richiama la richiesta clone a pipeline, ciò che succede è:

  • Copia il codice sorgente e la configurazione dalla pipeline esistente in una nuova, applicando qualsiasi override di configurazione specificato.
  • Aggiorna le definizioni di vista materializzata e le definizioni di tabella di streaming e i riferimenti con le modifiche necessarie per la gestione di tali oggetti da gestire da Unity Catalog.
  • Avvia un aggiornamento della pipeline per eseguire la migrazione dei dati e dei metadati esistenti, ad esempio i checkpoint, per qualsiasi tabella di streaming nella pipeline. Ciò consente alle tabelle di streaming di riprendere l'elaborazione nello stesso punto della pipeline originale.

Al termine dell'operazione di clonazione, sia le pipeline originali che quelle nuove possono essere eseguite in modo indipendente.

Questo articolo include esempi di richiesta API effettuata direttamente e utilizzando uno script Python da un notebook di Databricks.

Prima di iniziare

Prima di clonare una pipeline, è necessario quanto segue:

  • Per clonare una pipeline del metastore Hive, le tabelle e le viste definite nella pipeline devono essere pubblicate in uno schema di destinazione. Per informazioni su come aggiungere uno schema di destinazione a una pipeline, vedere Configurare una pipeline per la pubblicazione nel metastore Hive.

  • I riferimenti alle tabelle o alle viste gestite del metastore Hive nella pipeline da clonare devono essere completamente qualificati con il catalogo (hive_metastore), lo schema e il nome della tabella. Ad esempio, nel codice seguente che crea un set di dati customers, l'argomento nome tabella deve essere aggiornato a hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Non modificare il codice sorgente della pipeline del metastore Hive di origine durante un'operazione di clonazione, inclusi i notebook configurati come parte della pipeline e i moduli memorizzati in cartelle Git o in file dell'area di lavoro.

  • La pipeline del metastore Hive di origine non deve essere in esecuzione quando si avvia l'operazione di clonazione. Se un aggiornamento è in esecuzione, arrestarlo o attendere il completamento.

Di seguito sono riportate altre considerazioni importanti prima della clonazione di una pipeline:

  • Se le tabelle nella pipeline del metastore Hive specificano un percorso di archiviazione usando l'argomento path in Python o LOCATION in SQL, passare la configurazione "pipelines.migration.ignoreExplicitPath": "true" alla richiesta di clonazione. L'impostazione di questa configurazione è inclusa nelle istruzioni seguenti.
  • Se la pipeline del metastore di Hive include un'origine del caricatore automatico che specifica un valore per l'opzione cloudFiles.schemaLocation e la pipeline del metastore di Hive rimane operativa dopo la creazione del clone di Unity Catalog, è necessario impostare l'opzione mergeSchema su true sia nella pipeline del metastore di Hive che nella pipeline clonata di Unity Catalog. Aggiungendo questa opzione alla pipeline del metastore Hive prima della clonazione, l'opzione verrà copiata nella nuova pipeline.

Clonare una pipeline con l'API REST di Databricks

L'esempio seguente usa il comando curl per chiamare la richiesta di clone a pipeline nell'API REST di Databricks:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Sostituire:

  • <personal-access-token> con un token personale di accesso Databricks .
  • <databricks-instance> con il nome dell'istanza dell'area di lavoro di Azure Databricks , ad esempio adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> con l'identificatore univoco della pipeline del metastore Hive da clonare. È possibile trovare l'ID della pipeline nell'interfaccia utente di Delta Live Tables.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Sostituire:

  • <target-catalog-name> con il nome di un catalogo in Unity Catalog in cui deve essere pubblicata la nuova pipeline. Deve essere un catalogo esistente.
  • <target-schema-name> con il nome di uno schema in Unity Catalog in cui la nuova pipeline deve pubblicare se è diversa dal nome dello schema corrente. Questo parametro è facoltativo e, se non specificato, viene usato il nome dello schema esistente.
  • <new-pipeline-name> con un nome opzionale per la nuova pipeline. Se non specificato, la nuova pipeline viene denominata utilizzando il nome della pipeline di origine con [UC] aggiunto.

clone_mode specifica la modalità da usare per l'operazione di clonazione. MIGRATE_TO_UC è l'unica opzione supportata.

Usare il campo configuration per specificare le configurazioni nella nuova pipeline. I valori impostati qui sostituiscono le configurazioni nella pipeline originale.

La risposta alla richiesta dell'API REST clone è l'ID della nuova pipeline di Unity Catalog.

Clonare una pipeline da un notebook di Databricks

L'esempio seguente chiama la richiesta create a pipeline da uno script Python. È possibile usare un notebook di Databricks per eseguire questo script:

  1. Creare un nuovo notebook per lo script. Guarda Crea un notebook.
  2. Copiare lo script Python seguente nella prima cella del notebook.
  3. Aggiornare i valori segnaposto nello script sostituendo:
    • <databricks-instance> con il nome dell'istanza dell'area di lavoro di Azure Databricks , ad esempio adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> con l'identificatore univoco della pipeline del metastore Hive da clonare. È possibile trovare l'ID della pipeline nell'interfaccia utente di Delta Live Tables.
    • <target-catalog-name> con il nome di un catalogo in Unity Catalog in cui deve essere pubblicata la nuova pipeline. Deve essere un catalogo esistente.
    • <target-schema-name> con il nome di uno schema in Unity Catalog in cui la nuova pipeline deve pubblicare se è diversa dal nome dello schema corrente. Questo parametro è facoltativo e, se non specificato, viene usato il nome dello schema esistente.
    • <new-pipeline-name> con un nome opzionale per la nuova pipeline. Se non specificato, la nuova pipeline viene denominata utilizzando il nome della pipeline di origine con [UC] aggiunto.
  4. Esegui lo script. Consulta eseguire i notebook di Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitazioni

Di seguito sono riportate le limitazioni della richiesta API Delta Live Tables clone a pipeline:

  • È supportata solo la clonazione di una pipeline configurata per utilizzare il metastore Hive in una pipeline del catalogo Unity.
  • È possibile creare un clone solo nella stessa area di lavoro di Azure Databricks della pipeline da cui si sta clonando.
  • La pipeline che si sta clonando può includere solo le origini di streaming seguenti:
  • Se la pipeline del metastore Hive che stai clonando utilizza la modalità di notifica dei file di Auto Loader, Databricks consiglia di non eseguire la pipeline del metastore Hive dopo la clonazione. Ciò è dovuto al fatto che l'esecuzione della pipeline del metastore Hive comporta l'eliminazione di alcuni eventi di notifica dei file dal clone del catalogo Unity. Se la pipeline del metastore Hive di origine viene eseguita dopo il completamento dell'operazione di clonazione, è possibile recuperare i file mancanti usando Auto Loader con l'opzione cloudFiles.backfillInterval. Per informazioni sulla modalità di notifica file di Auto Loader, vedere Che cos'è la modalità di notifica file di Auto Loader?. Per informazioni sul backfilling dei file con Auto Loader, vedere Attivare i backfill regolari usando cloudFiles.backfillInterval e le opzioni comuni di Auto Loader.
  • Le attività di manutenzione della pipeline vengono sospese automaticamente per entrambe le pipeline mentre è in corso la clonazione.
  • Di seguito si applicano le query di spostamento temporale sulle tabelle nella pipeline clonata del catalogo Unity:
    • Se una versione di tabella è stata originariamente scritta in un oggetto gestito da metastore Hive, le query di spostamento temporale che usano una clausola timestamp_expression non sono predefinite durante l'esecuzione di query sull'oggetto Catalogo Unity clonato.
    • Tuttavia, se la versione della tabella è stata scritta nell'oggetto Catalog di Unity clonato, le query di spostamento temporale usando una clausola timestamp_expression funzionano correttamente.
    • Le query di spostamento temporale che usano una clausola version funzionano correttamente quando si esegue una query su un oggetto Catalogo Unity clonato, anche quando la versione è stata originariamente scritta nell'oggetto gestito del metastore Hive.
  • Per altre limitazioni quando si usano le Tabelle Delta Live con Unity Catalog, vedere limitazioni della pipeline di Unity Catalog.
  • Per le limitazioni del Catalogo Unity, vedere Limitazioni del Catalogo Unity.