Partilhar via


Criar uma linha de processos no Unity Catalog ao clonar uma linha de processos do metastore do Hive

Este artigo descreve a solicitação de clone a pipeline na API REST do Databricks e como você pode usá-la para copiar um pipeline existente que publica no metastore do Hive para um novo pipeline que publica no Unity Catalog. Quando você liga para a clone a pipeline solicitação, ele:

  • Copia o código-fonte e a configuração do pipeline existente para um novo, aplicando quaisquer alterações de configuração especificadas.
  • Atualiza as definições e referências da tabela de exibição materializada e Streaming com as alterações necessárias para que esses objetos sejam gerenciados pelo Unity Catalog.
  • Inicia uma atualização de pipeline para migrar os dados e metadados existentes, como pontos de verificação, para quaisquer tabelas de Streaming no pipeline. Isso permite que essas tabelas de streaming retomem o processamento no mesmo ponto do pipeline original.

Após a conclusão da operação de clonagem, o pipeline original e o novo podem ser executados de forma independente.

Este artigo inclui exemplos de como chamar a solicitação de API diretamente e por meio de um script Python de um bloco de anotações Databricks.

Antes de começar

O seguinte é necessário antes de clonar um pipeline:

  • Para clonar um pipeline de metastore do Hive, as tabelas e exibições definidas no pipeline devem publicar tabelas em um esquema de destino. Para saber mais sobre como adicionar um esquema de destino a um pipeline, consulte Configurar um pipeline para publicar no Hive metastore.

  • As referências a tabelas ou exibições gerenciadas pelo metastore do Hive no pipeline para clonagem devem ser totalmente qualificadas com o catálogo (hive_metastore), esquema e nome da tabela. Por exemplo, no código a seguir criando um conjunto de dados customers, o argumento nome da tabela deve ser atualizado para hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Não edite o código-fonte do pipeline de metastore do Hive de origem enquanto uma operação de clone estiver em andamento, incluindo notebooks configurados como parte do pipeline e quaisquer módulos armazenados em pastas Git ou arquivos de espaço de trabalho.

  • O pipeline de metastore do Hive de origem não deve estar em execução quando você inicia a operação de clonagem. Se uma atualização estiver em execução, interrompa-a ou aguarde até que seja concluída.

A seguir estão outras considerações importantes antes de clonar um pipeline:

  • Se as tabelas no pipeline de metastore do Hive especificarem um local de armazenamento usando o argumento path em Python ou LOCATION em SQL, passe a configuração "pipelines.migration.ignoreExplicitPath": "true" para a solicitação de clone. A definição desta configuração está incluída nas instruções abaixo.
  • Se o pipeline de metastore do Hive incluir uma fonte de carregador automático que especifique um valor para a opção cloudFiles.schemaLocation e o pipeline de metastore do Hive permanecerá operacional após a criação do clone do Unity Catalog, você deverá definir a opção mergeSchema para true no pipeline de metastore do Hive e no pipeline clonado do Unity Catalog. Adicionar essa opção ao pipeline de metastore do Hive antes da clonagem copiará a opção para o novo pipeline.

Clonar um pipeline com a API REST do Databricks

O exemplo a seguir usa o comando curl para chamar a solicitação clone a pipeline na API REST do Databricks:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Substituir:

  • <personal-access-token> com um token de acesso pessoal Databricks .
  • <databricks-instance> com o nome da instância do espaço de trabalho do Azure Databricks , por exemplo adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> com o identificador exclusivo do Hive metastore pipeline para clonar. Você pode encontrar o ID do pipeline na interface do utilizador do Delta Live Tables.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Substituir:

  • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente.
  • <target-schema-name> com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.
  • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.

clone_mode especifica o modo a ser usado para a operação de clone. MIGRATE_TO_UC é a única opção suportada.

Use o campo configuration para especificar configurações no novo pipeline. Os valores definidos aqui substituem as configurações no pipeline original.

A resposta da solicitação de API REST do clone é o identificador do novo pipeline do Catálogo Unity.

Clone um pipeline a partir de um notebook Databricks

O exemplo a seguir executa a solicitação create a pipeline a partir de um script em Python. Você pode usar um bloco de anotações Databricks para executar este script:

  1. Crie um novo bloco de anotações para o script. Consulte Criar um bloco de notas.
  2. Copie o seguinte script Python para a primeira célula do bloco de anotações.
  3. Atualize os valores dos marcadores de posição no script através da substituição de:
    • <databricks-instance> com o nome da instância do espaço de trabalho do Azure Databricks , por exemplo adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> com o identificador exclusivo do Hive metastore pipeline para clonar. Você pode encontrar o ID do pipeline na interface do utilizador do Delta Live Tables.
    • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente.
    • <target-schema-name> com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.
    • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.
  4. Corre o script. Veja como executar notebooks Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitações

A seguir estão as limitações da solicitação Delta Live Tables clone a pipeline API:

  • Somente a clonagem de um pipeline configurado para usar o metastore do Hive para um pipeline do Unity Catalog é suportada.
  • Você pode criar um clone somente no mesmo espaço de trabalho do Azure Databricks que o pipeline do qual está clonando.
  • O pipeline que você está clonando pode incluir apenas as seguintes fontes de streaming:
    • Fontes Delta
    • Auto Loader, incluindo quaisquer fontes de dados suportadas pelo Auto Loader. Consulte Carregar ficheiros deobjetos de armazenamento na nuvem.
    • Apache Kafka com Streaming Estruturado. No entanto, a fonte Kafka não pode ser configurada para usar a opção kafka.group.id. Consulte Processamento de fluxo com o Apache Kafka e o Azure Databricks.
    • Amazon Kinesis com streaming estruturado. No entanto, a origem do Kinesis não pode ser configurada para definir consumerMode como efo.
  • Se o pipeline de metastore do Hive que você está clonando usar o modo de notificação de arquivo do Auto Loader, o Databricks recomenda não executar o pipeline de metastore do Hive após a clonagem. Isso ocorre porque a execução do pipeline de metastore do Hive resulta na eliminação de alguns eventos de notificação de ficheiros do clone do Unity Catalog. Se o pipeline de metastore do Hive de origem for executado após a conclusão da operação de clonagem, poderás repor os arquivos ausentes usando o "Auto Loader" com a opção cloudFiles.backfillInterval. Para saber mais sobre o modo de notificação de arquivo do Auto Loader, consulte O que é o modo de notificação de arquivo do Auto Loader?. Para saber mais sobre o preenchimento de arquivos com o Auto Loader, consulte Acionar preenchimentos regulares usando cloudFiles.backfillInterval e opções comuns do Auto Loader.
  • As tarefas de manutenção dos pipelines são pausadas automaticamente para ambos os pipelines enquanto a clonagem está em andamento.
  • O seguinte aplica-se a consultas de retrospetiva em tabelas do pipeline clonado do Catálogo Unity:
    • Se uma versão de tabela foi originalmente gravada num objeto gerido por um metastore Hive, consultas de viagem no tempo usando uma cláusula timestamp_expression são indefinidas quando se consulta o objeto clonado no Unity Catalog.
    • No entanto, se a versão da tabela foi gravada no objeto Unity Catalog clonado, as consultas de viagem no tempo usando uma cláusula timestamp_expression funcionam corretamente.
    • As consultas de viagem no tempo usando uma cláusula version funcionam corretamente ao consultar um objeto clonado do Unity Catalog, mesmo quando a versão foi originalmente armazenada no objeto gerido pelo metastore do Hive.
  • Para obter outras limitações ao usar o Delta Live Tables com o Unity Catalog, consulte Unity Catalog pipeline limitations.
  • Para conhecer as limitações do Catálogo Unity, consulte Limitações do Catálogo Unity.