Partilhar via


Executar um pipeline de DLT em um fluxo de trabalho

Você pode operar um pipeline DLT como parte de um fluxo de trabalho de processamento de dados com tarefas do Databricks, Apache Airflow ou Azure Data Factory.

Empregos

Você pode orquestrar várias tarefas em um trabalho Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline de DLT em um trabalho, use a tarefa Pipeline ao criar um trabalho. Consulte a tarefa do pipeline de DLT para os trabalhos.

Apache Airflow

Apache Airflow é uma solução de código aberto para gerenciar e agendar fluxos de trabalho de dados. Airflow representa fluxos de trabalho como gráficos acíclicos direcionados (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia o agendamento e a execução. Para obter informações sobre como instalar e usar o Airflow com o Azure Databricks, consulte Orquestrar trabalhos do Azure Databricks com o Apache Airflow.

Para executar um pipeline DLT como parte de um fluxo de trabalho Airflow, use o DatabricksSubmitRunOperator.

Requerimentos

O seguinte é necessário para usar o suporte de fluxo de ar para DLT:

  • Airflow versão 2.1.0 ou posterior.
  • O pacote do provedor Databricks na versão 2.1.0 ou posterior.

Exemplo

O exemplo a seguir cria um DAG do Airflow que gatilha uma atualização para o pipeline de DLT com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma conexão de fluxo de ar ao seu espaço de trabalho.

Salve este exemplo no diretório airflow/dags e use a interface do usuário do Airflow para exibir e acionar o DAG. Utilize a interface de utilizador DLT para ver os detalhes da atualização do pipeline.

Azure Data Factory

Observação

A DLT e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se os valores de repetição estiverem configurados no seu pipeline DLT e, na atividade do Azure Data Factory que invoca o pipeline, o número de novas tentativas será o valor de repetição do Azure Data Factory multiplicado pelo valor de repetição de DLT.

Por exemplo, se uma atualização de pipeline falhar, a DLT tentará novamente a atualização até cinco vezes por padrão. Se o número de tentativas do Azure Data Factory estiver definido como três e o seu pipeline de DLT usar o padrão de cinco tentativas de repetição, o seu pipeline de DLT com falha poderá ser tentado novamente até quinze vezes. Para evitar tentativas excessivas de repetição quando as atualizações de pipeline falham, o Databricks recomenda limitar o número de novas tentativas ao configurar o pipeline DLT ou a atividade do Azure Data Factory que chama o pipeline.

Para alterar a configuração de repetição do seu pipeline DLT, use a configuração pipelines.numUpdateRetryAttempts quando estiver a configurar o pipeline.

O Azure Data Factory é um serviço ETL baseado na nuvem que lhe permite orquestrar fluxos de trabalho de integração e transformação de dados. O Azure Data Factory dá suporte direto à execução de tarefas do Azure Databricks num fluxo de trabalho, incluindo notebooks , tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando a API do DLT a partir de uma atividade Web do Azure Data Factory. Por exemplo, para disparar uma atualização de pipeline do Azure Data Factory:

  1. Crie uma fábrica de dados ou abra uma fábrica de dados existente.

  2. Quando a criação for concluída, abra a página da sua fábrica de dados e clique no mosaico Open Azure Data Factory Studio. A interface do usuário do Azure Data Factory é exibida.

  3. Crie um novo pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Atividades, expanda Geral e arraste a atividade Web para a tela do pipeline. Clique no separador Definições e insira os seguintes valores:

    Observação

    Como prática recomendada de segurança, quando você se autentica com ferramentas, sistemas, scripts e aplicativos automatizados, o Databricks recomenda que você use tokens de acesso pessoal pertencentes a entidades de serviço em vez de usuários do espaço de trabalho. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador do pipeline.

    • Método: Selecione POST no menu suspenso.

    • Cabeçalhos: Clique em + Novo. Na caixa de texto Nome, digite Authorization. Na caixa de texto Valor, digite Bearer <personal-access-token>.

      Substitua <personal-access-token> por um Azure Databricks token de acesso pessoal.

    • Body: Para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados da pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, utilize colchetes vazios ({}).

Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos no separador Saída do pipeline do Azure Data Factory. Use a interface DLT para ver os detalhes da atualização do pipeline.

Dica

Um requisito comum do fluxo de trabalho é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação de updates DLT é assíncrona — a solicitação retorna após iniciar a atualização, mas antes que a atualização seja concluída — as tarefas em seu pipeline do Azure Data Factory com uma dependência da atualização DLT devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Até após a atividade da Web que aciona a atualização DLT. Na atividade Até:

  1. Adicione uma atividade Esperar para aguardar um número configurado de segundos para a conclusão da atualização.
  2. Adicione uma atividade Web após a atividade de Espera que utiliza o pedido de detalhes de atualização DLT para obter o status da atualização. O campo state na resposta retorna o estado atual da atualização, inclusive se ela foi concluída.
  3. Utilize o valor do campo state para definir a condição de término para a atividade 'Until'. Você também pode usar um de atividade Definir variável para adicionar uma variável de pipeline com base no valor state e usar essa variável para a condição de encerramento.