Partilhar via


Executar um pipeline Delta Live Tables em um fluxo de trabalho

Você pode executar um pipeline Delta Live Tables integrado num fluxo de trabalho de processamento de dados utilizando tarefas do Databricks, Apache Airflow ou o Azure Data Factory.

Tarefas

Você pode orquestrar várias tarefas em um trabalho Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline Delta Live Tables em um trabalho, use a tarefa Pipeline ao criar um trabalho. Consulte a tarefa do pipeline Delta Live Tables para trabalhos.

Fluxo de ar Apache

O Apache Airflow é uma solução de código aberto para gerenciar e agendar fluxos de trabalho de dados. O fluxo de ar representa fluxos de trabalho como gráficos acíclicos direcionados (DAGs) das operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia o agendamento e a execução. Para obter informações sobre como instalar e usar o Airflow com o Azure Databricks, consulte Orquestrar trabalhos do Azure Databricks com o Apache Airflow.

Para executar um pipeline Delta Live Tables como parte de um fluxo de trabalho Airflow, use o DatabricksSubmitRunOperator.

Requisitos

O seguinte é necessário para usar o suporte de fluxo de ar para Delta Live Tables:

  • Airflow versão 2.1.0 ou posterior.
  • O pacote do provedor Databricks versão 2.1.0 ou posterior.

Exemplo

O exemplo a seguir cria um Airflow DAG que aciona uma atualização para o pipeline Delta Live Tables com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma conexão de fluxo de ar ao seu espaço de trabalho.

Salve este exemplo no airflow/dags diretório e use a interface do usuário do Airflow para exibir e acionar o DAG. Utilize a interface de utilizador do Delta Live Tables para ver os detalhes da atualização do pipeline.

Azure Data Factory

Nota

O Delta Live Tables e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se forem configurados valores de repetição no pipeline do Delta Live Tables e na atividade do Azure Data Factory que chama o pipeline, o número de repetições será o valor de repetição do Azure Data Factory multiplicado pelo valor de repetição do Delta Live Tables.

Por exemplo, se uma atualização de pipeline falhar, o Delta Live Tables tentará novamente a atualização até cinco vezes por padrão. Se a repetição do Azure Data Factory estiver definida como três e o seu pipeline Delta Live Tables utilizar o padrão de cinco repetições, o seu pipeline Delta Live Tables com falha poderá ser repetido até quinze vezes. Para evitar tentativas excessivas de repetição quando as atualizações de pipeline falham, o Databricks recomenda limitar o número de novas tentativas ao configurar o pipeline Delta Live Tables ou a atividade do Azure Data Factory que chama o pipeline.

Para alterar as configurações de tentativa para o seu pipeline do Delta Live Tables, use a configuração pipelines.numUpdateRetryAttempts ao configurar o pipeline.

O Azure Data Factory é um serviço ETL baseado na nuvem que lhe permite orquestrar fluxos de trabalho de integração e transformação de dados. O Azure Data Factory dá suporte direto à execução de tarefas do Azure Databricks em um fluxo de trabalho, incluindo blocos de anotações, tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando o Delta Live Tables API de umde atividade da Web do Azure Data Factory . Por exemplo, para disparar uma atualização de pipeline do Azure Data Factory:

  1. Crie uma fábrica de dados ou abra uma fábrica de dados existente.

  2. Quando a criação for concluída, abra a página do seu data factory e clique no bloco Open Azure Data Factory Studio . A interface do usuário do Azure Data Factory é exibida.

  3. Crie um novo pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Atividades, expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na guia Configurações e insira os seguintes valores:

    Nota

    Como prática recomendada de segurança, quando você se autentica com ferramentas, sistemas, scripts e aplicativos automatizados, o Databricks recomenda que você use tokens de acesso pessoal pertencentes a entidades de serviço em vez de usuários do espaço de trabalho. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.

    • .

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador da linha de processamento.

    • Método: Selecione POST no menu suspenso.

    • Cabeçalhos: Clique em + Novo. Na caixa de texto Nome, digite Authorization. Na caixa de texto Valor, digite Bearer <personal-access-token>.

      Substitua <personal-access-token> por um token de acesso pessoal do Azure Databricks.

    • Body: Para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados para o pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}).

Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na guia Saída do pipeline do Azure Data Factory. Use a interface de utilizador do Delta Live Tables para visualizar os detalhes da atualização do pipeline.

Gorjeta

Um requisito comum do fluxo de trabalho é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação do updates Delta Live Tables é assíncrona — a solicitação retorna após iniciar a atualização, mas antes que a atualização seja concluída — as tarefas em seu pipeline do Azure Data Factory com uma dependência da atualização Delta Live Tables devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Até, após a atividade web que aciona a atualização Delta Live Tables. Na atividade Até:

  1. Adicione uma atividade Esperar para aguardar um número configurado de segundos para a conclusão da atualização.
  2. Adicione uma atividade da Web após a atividade Esperar que usa a solicitação de detalhes de atualização do Delta Live Tables para obter o status da atualização. O campo state na resposta retorna o estado atual da atualização, inclusive se ela foi concluída.
  3. Utilize o valor do campo state para definir a condição de término da atividade "Até". Você também pode usar uma atividade Definir variável para adicionar uma variável de pipeline com base no valor e usar essa variável para a condição final.