Orquestrar trabalhos do Azure Databricks com o Apache Airflow
Este artigo descreve o suporte do Apache Airflow para orquestrar pipelines de dados com o Azure Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um fluxo de trabalho do Azure Databricks com o Airflow.
Orquestração de trabalho em um pipeline de dados
O desenvolvimento e a implantação de um pipeline de processamento de dados geralmente exigem o gerenciamento de dependências complexas entre tarefas. Por exemplo, um pipeline pode ler dados de uma fonte, limpar os dados, transformar os dados limpos e escrever os dados transformados em um destino. Você também precisa de suporte para testar, agendar e solucionar problemas de erros ao operacionalizar um pipeline.
Os sistemas de fluxo de trabalho abordam esses desafios permitindo que você defina dependências entre tarefas, agende o horário de execução dos pipelines e monitore os fluxos de trabalho. O Apache Airflow é uma solução de código aberto usada para gerenciar e agendar pipelines de dados. O Airflow representa os pipelines de dados como DAGs (grafos direcionados acíclicos) de operações. Você define um fluxo de trabalho em um arquivo Python, e o Airflow gerencia o agendamento e a execução. A conexão entre o Airflow e o Azure Databricks permite que você aproveite o mecanismo do Spark otimizado oferecido pelo Azure Databricks com os recursos de agendamento do Airflow.
Requisitos
- A integração entre o Airflow e o Azure Databricks está disponível no Airflow versão 2.5.0 e posterior. Os exemplos deste artigo são testados com o Airflow versão 2.6.1.
- O Airflow exige Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos deste artigo são testados com o Python 3.8.
- As instruções neste artigo para instalar e executar o Airflow exigem pipenv para criar um ambiente virtual do Python.
Operadores do Airflow para Databricks
Um DAG do Airflow é composto de tarefas, em que cada tarefa executa um Operador do Airflow. Os operadores do Airflow que dão suporte à integração ao Databricks são implementados no provedor do Databricks.
O provedor do Databricks inclui operadores para executar várias tarefas em um workspace do Azure Databricks, incluindo importar dados para uma tabela, executar consultas SQL e trabalhar com pastas Git do Databricks.
O provedor do Databricks implementa dois operadores para disparar trabalhos:
- O DatabricksRunNowOperator exige um trabalho do Azure Databricks existente e usa a solicitação de API POST /api/2.1/jobs/run-now para disparar uma execução. O Databricks recomenda o uso do
DatabricksRunNowOperator
porque ele reduz a duplicação de definições de trabalho e as execuções de trabalho disparadas com esse operador são fáceis de serem encontradas na interface do usuário de Trabalhos. - O DatabricksSubmitRunOperator não exige a existência de um trabalho no Azure Databricks e usa a solicitação de API POST /api/2.1/jobs/runs/submit para enviar a especificação do trabalho e disparar uma execução.
Para criar um novo trabalho do Azure Databricks ou redefinir um trabalho existente, o provedor do Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator
usa as solicitações de API POST /api/2.1/jobs/create e POST /api/2.1/jobs/reset. Você pode usar o DatabricksCreateJobsOperator
com o DatabricksRunNowOperator
para criar e executar um trabalho.
Observação
Usar os operadores do Databricks para disparar um trabalho requer o fornecimento de credenciais na configuração de conexão do Databricks. Consulte Criar um token de acesso pessoal do Azure Databricks para o Airflow.
O operador do Databricks Airflow grava a URL da página de execução de trabalho nos logs do Airflow a cada polling_period_seconds
(o padrão é de 30 segundos). Para obter mais informações, confira a página do pacote apache-airflow-providers-databricks no site do Airflow.
Instalar a integração entre o Airflow e o Azure Databricks
Para instalar o Airflow e o provedor do Databricks localmente para teste e desenvolvimento, use as etapas a seguir. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte a seção instalação na documentação do Airflow.
Abra um terminal e execute os seguintes comandos:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Substitua <firstname>
, <lastname>
e <email>
por seu nome de usuário e email. Você será solicitado a inserir uma senha para o usuário administrador. Salve essa senha porque ela é necessária para entrar na interface do usuário do Airflow.
O script executa as seguintes etapas:
- Cria um diretório chamado
airflow
e muda para esse diretório. - Usa
pipenv
para criar e gerar um ambiente virtual do Python. O Databricks recomenda usar um ambiente virtual do Python para isolar as versões de pacote e as dependências de código nesse ambiente. Esse isolamento ajuda a reduzir incompatibilidades inesperadas de versões do pacote e colisões de dependência de código. - Inicializa uma variável de ambiente chamada
AIRFLOW_HOME
definida para o caminho do diretórioairflow
. - Instala o Airflow e os pacotes de provedor do Databricks no Airflow.
- Cria um diretório
airflow/dags
. O Airflow usa o diretóriodags
para armazenar definições de DAG. - Inicializa um banco de dados SQLite usado pelo Airflow para acompanhar os metadados. Em uma implantação de produção do Airflow, o Airflow é configurado com um banco de dados padrão. O banco de dados SQLite e a configuração padrão para a implantação do Airflow são inicializados no diretório
airflow
. - Cria um usuário administrador para o Airflow.
Dica
Para confirmar a instalação do provedor do Databricks, execute o seguinte comando no diretório de instalação do Airflow:
airflow providers list
Iniciar o agendador e o servidor Web do Airflow
O servidor Web do Airflow é necessário para exibir a interface do usuário do Airflow. Para iniciar o servidor Web, abra um terminal no diretório de instalação do Airflow e execute os seguintes comandos:
Observação
Se o servidor Web do Airflow não for iniciado devido a um conflito de porta, você poderá alterar a porta padrão na configuração do Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
O agendador é o componente do Airflow que agenda os DAGs. Para iniciar o agendador, abra um novo terminal no diretório de instalação do Airflow e execute os seguintes comandos:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Testar a instalação do Airflow
Para verificar a instalação do Airflow, você pode executar um dos DAGs de exemplo incluídos no Airflow:
- Em uma janela do navegador, abra
http://localhost:8080/home
. Entre na interface do usuário do Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página DAGs do Airflow é exibida. - Clique na alternância Pausar/Retomar DAG para retomar um dos DAGs de exemplo, como o
example_python_operator
. - Dispare o DAG de exemplo clicando no botão Disparar DAG.
- Clique no nome do DAG para ver detalhes, incluindo o status de execução do DAG.
Criar um token de acesso pessoal do Azure Databricks para o Airflow
O Airflow se conecta ao Databricks usando um token de acesso pessoal (PAT) do Azure Databricks. Para criar um PAT, siga as etapas em Tokens de acesso pessoal do Azure Databricks para usuários do workspace.
Observação
Como melhor prática de segurança, ao autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, o Databricks recomenda que você use tokens de acesso pertencentes às entidades de serviço e não aos usuários do workspace. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.
Você também pode autenticar no Azure Databricks usando um token do Microsoft Entra ID. Consulte Conexão do Databricks na documentação do Airflow.
Configurar uma conexão do Azure Databricks
A instalação do Airflow contém uma conexão padrão com o Azure Databricks. Para atualizar a conexão e se conectar ao seu workspace usando o token de acesso pessoal que você criou acima:
- Em uma janela do navegador, abra
http://localhost:8080/connection/list/
. Se solicitado a entrar, insira o nome de usuário e a senha do administrador. - Em ID da Conexão, localize databricks_default e clique no botão Editar registro.
- Substitua o valor no campo Host pelo nome da instância do workspace da sua implantação do Azure Databricks, por exemplo,
https://adb-123456789.cloud.databricks.com
. - No campo Senha, insira o token de acesso pessoal do Azure Databricks.
- Clique em Save (Salvar).
Se você estiver usando um token do Microsoft Entra ID, consulte a seção Conexão do Databricks na documentação do Airflow para obter informações sobre como configurar a autenticação.
Exemplo: criar um DAG do Airflow para executar um trabalho do Azure Databricks
O exemplo a seguir demonstra como criar uma implantação simples do Airflow que é executada no computador local e implanta um DAG de exemplo para disparar as execuções no Azure Databricks. Neste exemplo, você vai:
- Criar um notebook e adicionar um código para imprimir uma saudação de acordo com um parâmetro configurado.
- Criar um trabalho do Azure Databricks com uma só tarefa que executa o notebook.
- Configurar uma conexão do Airflow com seu workspace do Azure Databricks.
- Criar um DAG do Airflow para disparar o trabalho do notebook. O DAG é definido em um script Python por meio de
DatabricksRunNowOperator
. - Usar a interface do usuário do Airflow para disparar o DAG e ver o status de execução.
Criar um notebook
Este exemplo usa um notebook que contém duas células:
- A primeira célula contém um widget de texto dos Utilitários do Databricks definindo uma variável chamada
greeting
definida como o valor padrãoworld
. - A segunda célula imprime o valor da variável
greeting
prefixada porhello
.
Para criar o notebook:
Acesse o workspace do Azure Databricks, clique em Novo na barra lateral e selecione Notebook.
Dê um nome ao notebook, como Hello Airflow, e verifique se a linguagem padrão está definida como Python.
Copie o código Python a seguir e cole-o na primeira célula do notebook.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:
print("hello {}".format(greeting))
Criar um trabalho
Clique em Fluxos de trabalho na barra lateral.
Clique no .
A guia Tarefas aparece com a caixa de diálogo de criação de tarefa.
Substitua Adicionar um nome ao trabalho… pelo nome do trabalho.
No campo Nome da tarefa, insira um nome para a tarefa, por exemplo, greeting-task.
No menu suspenso Tipo, selecione Notebook.
No menu suspenso Origem, selecione Workspace.
Clique na caixa de texto Caminho e use o pesquisador de arquivos para encontrar o notebook que você criou, clique no nome do navegador e clique em Confirmar.
Clique em Adicionar em Parâmetros. No campo Chave, insira
greeting
. No campo Valor, insiraAirflow user
.Clique em Criar tarefa.
No painel Detalhes do trabalho, copie o valor da ID do trabalho. Esse valor é necessário para disparar o trabalho do Airflow.
Executar o trabalho
Para testar seu novo trabalho na interface do usuário dos Trabalhos do Azure Databricks, clique em no canto superior direito. Quando a execução for concluída, você poderá verificar a saída exibindo os detalhes da execução do trabalho.
Criar um novo DAG do Airflow
Um DAG do Airflow é definido em um arquivo Python. Para criar um DAG e disparar o exemplo de trabalho de notebook:
Em um editor de texto ou um IDE, crie um arquivo chamado
databricks_dag.py
com o seguinte conteúdo:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Substitua
JOB_ID
pelo valor da ID do trabalho salvo anteriormente.Salve o arquivo no diretório
airflow/dags
. O Airflow lê e instala automaticamente os arquivos DAG armazenados emairflow/dags/
.
Instalar e verificar o DAG no Airflow
Para disparar e verificar o DAG na interface do usuário do Airflow:
- Em uma janela do navegador, abra
http://localhost:8080/home
. A tela DAGs do Airflow será exibida. - Localize
databricks_dag
e clique na alternância Pausar/Retomar DAG para retomar o DAG. - Dispare o DAG clicando no botãoDisparar DAG.
- Clique em uma execução na coluna Execuções para ver o status e os detalhes da execução.