Executar um pipeline das Tabelas Dinâmicas Delta em um fluxo de trabalho
Você pode executar um pipeline das Tabelas Dinâmicas Delta como parte de um fluxo de trabalho de processamento de dados com trabalhos do Databricks, com o Apache Airflow ou com o Azure Data Factory.
Trabalhos
Orquestre várias tarefas em um trabalho do Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline do Delta Live Tables em um trabalho, use a tarefa Pipeline ao criar um trabalho. Confira Tarefa de pipelines do Delta Live Tables para trabalhos.
Apache Airflow
O Apache Airflow é uma solução de código aberto usada para gerenciar e agendar pipelines de dados. O Airflow representa os fluxos de trabalho como DAGs (grafos direcionados acíclicos) de operações. Você define um fluxo de trabalho em um arquivo Python, e o Airflow gerencia o agendamento e a execução. Para obter informações sobre como instalar e usar o Airflow com o Azure Databricks, confira Orquestrar trabalhos do Azure Databricks com o Apache Airflow.
Para executar um pipeline das Tabelas Dinâmicas Delta como parte de um fluxo de trabalho do Airflow, use DatabricksubmitRunOperator.
Requisitos
Os seguintes requisitos são necessários para usar o suporte do Airflow nas Tabelas Dinâmicas Delta:
- Airflow versão 2.1.0 ou posterior.
- O pacote do provedor do Databricks versão 2.1.0 ou posterior.
Exemplo
O seguinte exemplo cria um DAG do Airflow que dispara uma atualização para o pipeline das Tabelas Dinâmicas Delta com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Substitua CONNECTION_ID
pelo identificador de uma conexão do Airflow com seu workspace.
Salve este exemplo no diretório airflow/dags
e use a interface do usuário do Airflow para ver e disparar o DAG. Use a interface do usuário das Tabelas Dinâmicas Delta para ver os detalhes da atualização do pipeline.
Fábrica de dados do Azure
O Azure Data Factory é um serviço de ETL baseado em nuvem que permite orquestrar fluxos de trabalho de transformação e integração de dados. O Azure Data Factory dá suporte direto à execução de tarefas do Azure Databricks em um fluxo de trabalho, incluindo notebooks, tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando a API das Tabelas Dinâmicas Delta em uma atividade da Web do Azure Data Factory. Por exemplo, para disparar uma atualização de pipeline do Azure Data Factory:
Crie um data factory ou abra um existente.
Quando a criação for concluída, abra a página do data factory e clique no bloco Abrir Azure Data Factory Studio. A interface do usuário do Azure Data Factory será exibida.
Crie um pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.
Na caixa de ferramentas Atividades, expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na guia Configurações e insira os seguintes valores:
Observação
Como melhor prática de segurança, ao autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, o Databricks recomenda que você use tokens de acesso pertencentes às entidades de serviço e não aos usuários do workspace. Para criar tokens para entidades de serviço, confira Gerenciar tokens para uma entidade de serviço.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Substituir
<get-workspace-instance>
.Substitua
<pipeline-id>
pelo identificador do pipeline.Método: selecione POST no menu suspenso.
Cabeçalhos: clique em + Novo. Na caixa de texto Nome, insira
Authorization
. Na caixa de texto Valor, insiraBearer <personal-access-token>
.Substitua
<personal-access-token>
por um token de acesso pessoal do Azure Databricks.Corpo: para transmitir parâmetros de solicitação adicionais, insira um documento JSON que contenha os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados do pipeline:
{"full_refresh": "true"}
. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}
).
Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da operação, incluindo erros, são exibidos na guia Saída do pipeline do Azure Data Factory. Use a interface do usuário das Tabelas Dinâmicas Delta para ver os detalhes da atualização do pipeline.
Dica
Um requisito comum de fluxo de trabalho é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação updates
das Tabelas Dinâmicas Delta é assíncrona, a solicitação é retornada depois de iniciar a atualização, mas antes que a atualização seja concluída, as tarefas do pipeline do Azure Data Factory com uma dependência na atualização das Tabelas Dinâmicas Delta precisam aguardar a conclusão da atualização. Uma opção usada para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade da Web que dispara a atualização das Tabelas Dinâmicas Delta. Na atividade Until:
- Adicione uma atividade Wait para aguardar um número configurado de segundos até a conclusão da atualização.
- Adicione uma atividade da Web após a atividade Wait que usa a solicitação Obter detalhes de atualização das Tabelas Dinâmicas Delta para obter o status da atualização. O campo
state
na resposta retorna o estado atual da atualização, incluindo a indicação da conclusão dela. - Use o valor do campo
state
para definir a condição de encerramento da atividade Until. Use também uma atividade Definir Variável para adicionar uma variável de pipeline com base no valorstate
e usar essa variável para a condição de encerramento.