Organizowanie zadań usługi Azure Databricks za pomocą platformy Apache Airflow
W tym artykule opisano obsługę platformy Apache Airflow do organizowania potoków danych za pomocą usługi Azure Databricks, przedstawiono instrukcje dotyczące instalowania i konfigurowania przepływu pracy airflow lokalnie oraz przedstawiono przykład wdrażania i uruchamiania przepływu pracy usługi Azure Databricks za pomocą rozwiązania Airflow.
Orkiestracja zadań w potoku danych
Tworzenie i wdrażanie potoku przetwarzania danych często wymaga zarządzania złożonymi zależnościami między zadaniami. Na przykład potok może odczytywać dane ze źródła, czyścić dane, przekształcać oczyszczone dane i zapisywać przekształcone dane do miejsca docelowego. Potrzebna jest również obsługa testowania, planowania i rozwiązywania problemów z błędami podczas operacjonalizacji potoku.
Systemy przepływów pracy odpowiadają tym wyzwaniom, umożliwiając definiowanie zależności między zadaniami, planowanie uruchamiania potoków i monitorowanie przepływów pracy. Apache Airflow to rozwiązanie typu open source do zarządzania potokami danych i planowania ich. Przepływ powietrza reprezentuje potoki danych jako ukierunkowane grafy acykliczne (DAG) operacji. Definiujesz przepływ pracy w pliku języka Python, a aplikacja Airflow zarządza planowaniem i wykonywaniem. Połączenie airflow Azure Databricks umożliwia korzystanie ze zoptymalizowanego aparatu Spark oferowanego przez usługę Azure Databricks z funkcjami planowania airflow.
Wymagania
- Integracja platformy Airflow z usługą Azure Databricks wymaga rozwiązania Airflow w wersji 2.5.0 lub nowszej. Przykłady w tym artykule są testowane przy użyciu rozwiązania Airflow w wersji 2.6.1.
- Przepływ powietrza wymaga języka Python 3.8, 3.9, 3.10 lub 3.11. Przykłady w tym artykule są testowane przy użyciu języka Python 3.8.
- Instrukcje opisane w tym artykule dotyczące instalowania i uruchamiania platformy Airflow wymagają potoku w celu utworzenia środowiska wirtualnego języka Python.
Operatory przepływu powietrza dla usługi Databricks
Grupa DAG przepływu powietrza składa się z zadań, w których każde zadanie uruchamia operator przepływu powietrza. Operatory przepływu powietrza obsługujące integrację z usługą Databricks są implementowane w dostawcy usługi Databricks.
Dostawca usługi Databricks obejmuje operatory do uruchamiania wielu zadań w obszarze roboczym usługi Azure Databricks, w tym importowania danych do tabeli, uruchamiania zapytań SQL i pracy z folderami Git usługi Databricks.
Dostawca usługi Databricks implementuje dwa operatory do wyzwalania zadań:
- Narzędzie DatabricksRunNowOperator wymaga istniejącego zadania usługi Azure Databricks i używa żądania INTERFEJSu API POST /api/2.1/jobs/run-now , aby wyzwolić uruchomienie. Usługa Databricks zaleca użycie elementu ,
DatabricksRunNowOperator
ponieważ zmniejsza duplikację definicji zadań, a uruchomienia zadań wyzwalane za pomocą tego operatora można znaleźć w interfejsie użytkownika zadań. - Narzędzie DatabricksSubmitRunOperator nie wymaga, aby zadanie istniało w usłudze Azure Databricks i używa żądania POST /api/2.1/jobs/runs/submit API w celu przesłania specyfikacji zadania i wyzwolenia przebiegu.
Aby utworzyć nowe zadanie usługi Azure Databricks lub zresetować istniejące zadanie, dostawca usługi Databricks implementuje moduł DatabricksCreateJobsOperator. Używa DatabricksCreateJobsOperator
żądań INTERFEJSu API POST /api/2.1/jobs/create i POST /api/2.1/jobs/reset API. Możesz użyć DatabricksCreateJobsOperator
elementu z elementem , DatabricksRunNowOperator
aby utworzyć i uruchomić zadanie.
Uwaga
Wyzwalanie zadania przy użyciu operatorów usługi Databricks wymaga podania poświadczeń w konfiguracji połączenia usługi Databricks. Zobacz Tworzenie osobistego tokenu dostępu usługi Azure Databricks dla rozwiązania Airflow.
Operatory przepływu powietrza usługi Databricks zapisują adres URL strony uruchamiania zadania do dzienników airflow co polling_period_seconds
(wartość domyślna to 30 sekund). Aby uzyskać więcej informacji, zobacz stronę pakietu apache-airflow-providers-databricks w witrynie internetowej Airflow.
Lokalne instalowanie integracji rozwiązania Airflow Azure Databricks
Aby zainstalować platformę Airflow i dostawcę usługi Databricks lokalnie na potrzeby testowania i programowania, wykonaj następujące kroki. Inne opcje instalacji systemu Airflow, w tym tworzenie instalacji produkcyjnej, można znaleźć w dokumentacji rozwiązania Airflow.
Otwórz terminal i uruchom następujące polecenia:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Zastąp <firstname>
ciąg , <lastname>
i <email>
nazwą użytkownika i adresem e-mail. Zostanie wyświetlony monit o wprowadzenie hasła dla użytkownika administracyjnego. Pamiętaj, aby zapisać to hasło, ponieważ jest wymagane zalogowanie się do interfejsu użytkownika aplikacji Airflow.
Ten skrypt wykonuje następujące kroki:
- Tworzy katalog o nazwie
airflow
i zmienia się w tym katalogu. - Używa
pipenv
metody do tworzenia i tworzenia środowiska wirtualnego języka Python. Usługa Databricks zaleca używanie środowiska wirtualnego języka Python do izolowania wersji pakietów i zależności kodu do tego środowiska. Ta izolacja pomaga zmniejszyć nieoczekiwane niezgodności wersji pakietu i kolizje zależności kodu. - Inicjuje zmienną środowiskową o nazwie
AIRFLOW_HOME
set na ścieżkęairflow
katalogu. - Instaluje pakiety dostawcy Airflow i Airflow Databricks.
airflow/dags
Tworzy katalog. Funkcja Airflow używadags
katalogu do przechowywania definicji DAG.- Inicjuje bazę danych SQLite używaną przez funkcję Airflow do śledzenia metadanych. W przypadku wdrożenia produkcyjnego systemu Airflow należy skonfigurować przepływ powietrza przy użyciu standardowej bazy danych. Baza danych SQLite i domyślna konfiguracja wdrożenia airflow są inicjowane w
airflow
katalogu. - Tworzy użytkownika administratora aplikacji Airflow.
Napiwek
Aby potwierdzić instalację dostawcy usługi Databricks, uruchom następujące polecenie w katalogu instalacyjnym Airflow:
airflow providers list
Uruchamianie serwera internetowego airflow i harmonogramu
Aby wyświetlić interfejs użytkownika przepływu powietrza, wymagany jest serwer internetowy Airflow. Aby uruchomić serwer internetowy, otwórz terminal w katalogu instalacyjnym Airflow i uruchom następujące polecenia:
Uwaga
Jeśli nie można uruchomić serwera internetowego Airflow z powodu konfliktu portów, możesz zmienić domyślny port w konfiguracji airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
Harmonogram jest składnikiem Airflow, który planuje grupy DAG. Aby uruchomić harmonogram, otwórz nowy terminal w katalogu instalacyjnym Airflow i uruchom następujące polecenia:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Testowanie instalacji systemu Airflow
Aby sprawdzić instalację rozwiązania Airflow, możesz uruchomić jedną z przykładowych grup DAG dołączonych do rozwiązania Airflow:
- W oknie przeglądarki otwórz plik
http://localhost:8080/home
. Zaloguj się do interfejsu użytkownika aplikacji Airflow przy użyciu nazwy użytkownika i hasła utworzonego podczas instalowania aplikacji Airflow. Zostanie wyświetlona strona Grupy DAG przepływu powietrza. - Kliknij przełącznik Wstrzymaj /Copause daG, aby usunąć jedną z przykładowych grup DAG , na przykład
example_python_operator
. - Wyzwól przykładowy zestaw DAG, klikając przycisk Trigger DAG (Wyzwalaj grupę DAG ).
- Kliknij nazwę DAG, aby wyświetlić szczegóły, w tym stan uruchomienia grupy DAG.
Tworzenie osobistego tokenu dostępu usługi Azure Databricks dla rozwiązania Airflow
Aplikacja Airflow łączy się z usługą Databricks przy użyciu osobistego tokenu dostępu usługi Azure Databricks (PAT). Aby utworzyć osobisty token dostępu, wykonaj kroki opisane w artykule Osobiste tokeny dostępu usługi Azure Databricks dla użytkowników obszaru roboczego.
Uwaga
Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie osobistych tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.
Możesz również uwierzytelnić się w usłudze Azure Databricks przy użyciu tokenu identyfikatora Entra firmy Microsoft. Zobacz Databricks Connection (Połączenie usługi Databricks) w dokumentacji aplikacji Airflow.
Konfigurowanie połączenia usługi Azure Databricks
Instalacja rozwiązania Airflow zawiera domyślne połączenie dla usługi Azure Databricks. Aby zaktualizować połączenie w celu nawiązania połączenia z obszarem roboczym przy użyciu osobistego tokenu dostępu utworzonego powyżej:
- W oknie przeglądarki otwórz plik
http://localhost:8080/connection/list/
. Jeśli zostanie wyświetlony monit o zalogowanie się, wprowadź nazwę użytkownika i hasło administratora. - W obszarze Identyfikator conn znajdź databricks_default i kliknij przycisk Edytuj rekord .
- Zastąp wartość w polu Host nazwą wystąpienia obszaru roboczego wdrożenia usługi Azure Databricks, na przykład
https://adb-123456789.cloud.databricks.com
. - W polu Hasło wprowadź osobisty token dostępu usługi Azure Databricks.
- Kliknij przycisk Zapisz.
Jeśli używasz tokenu identyfikatora entra firmy Microsoft, zapoznaj się z dokumentacją usługi Databricks w dokumentacji aplikacji Airflow, aby uzyskać informacje na temat konfigurowania uwierzytelniania.
Przykład: tworzenie grupy DAG przepływu powietrza w celu uruchomienia zadania usługi Azure Databricks
W poniższym przykładzie pokazano, jak utworzyć proste wdrożenie systemu Airflow uruchamiane na komputerze lokalnym i wdrożyć przykładową grupę DAG w celu wyzwolenia przebiegów w usłudze Azure Databricks. W tym przykładzie wykonasz następujące elementy:
- Utwórz nowy notes i dodaj kod, aby wydrukować powitanie na podstawie skonfigurowanego parametru.
- Utwórz zadanie usługi Azure Databricks z pojedynczym zadaniem, które uruchamia notes.
- Skonfiguruj połączenie airflow z obszarem roboczym usługi Azure Databricks.
- Utwórz grupę DAG przepływu powietrza, aby wyzwolić zadanie notesu. Grupę DAG można zdefiniować w skryfcie języka Python przy użyciu polecenia
DatabricksRunNowOperator
. - Użyj interfejsu użytkownika przepływu powietrza, aby wyzwolić grupę DAG i wyświetlić stan uruchomienia.
Tworzenie notesu
W tym przykładzie użyto notesu zawierającego dwie komórki:
- Pierwsza komórka zawiera widżet tekstowy Narzędzia usługi Databricks, definiujący zmienną o nazwie
greeting
ustawioną na wartośćworld
domyślną . - Druga komórka wyświetla wartość zmiennej poprzedzonej
greeting
prefiksemhello
.
Aby utworzyć notes:
Przejdź do obszaru roboczego usługi Azure Databricks, kliknij pozycję Nowy na pasku bocznym i wybierz pozycję Notes.
Nadaj notesowi nazwę, taką jak Hello Airflow, i upewnij się, że język domyślny jest ustawiony na python.
Skopiuj następujący kod w języku Python i wklej go w pierwszej komórce notesu.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Dodaj nową komórkę poniżej pierwszej komórki i skopiuj i wklej następujący kod w języku Python do nowej komórki:
print("hello {}".format(greeting))
Tworzenie zadania
Kliknij pozycję Przepływy pracy na pasku bocznym.
Kliknij pozycję .
Na karcie Zadania zostanie wyświetlone okno dialogowe tworzenia zadania.
Zastąp ciąg Dodaj nazwę zadania... nazwą zadania.
W polu Nazwa zadania wprowadź nazwę zadania, na przykład greeting-task.
W menu rozwijanym Typ wybierz pozycję Notes.
W menu rozwijanym Źródło wybierz pozycję Obszar roboczy.
Kliknij pole tekstowe Ścieżka i użyj przeglądarki plików, aby znaleźć utworzony notes, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
Kliknij pozycję Dodaj w obszarze Parametry. W polu Klucz wprowadź wartość
greeting
. W polu Wartość wprowadź wartośćAirflow user
.Kliknij pozycję Utwórz zadanie.
Na panelu Szczegóły zadania skopiuj wartość Identyfikator zadania. Ta wartość jest wymagana do wyzwolenia zadania z przepływu powietrza.
Uruchamianie zadania
Aby przetestować nowe zadanie w interfejsie użytkownika zadań usługi Azure Databricks, kliknij w prawym górnym rogu. Po zakończeniu przebiegu możesz zweryfikować dane wyjściowe, wyświetlając szczegóły uruchomienia zadania.
Tworzenie nowej grupy DAG przepływu powietrza
Zdefiniuj grupę DAG przepływu powietrza w pliku języka Python. Aby utworzyć grupę DAG w celu wyzwolenia przykładowego zadania notesu:
W edytorze tekstów lub środowisku IDE utwórz nowy plik o nazwie o
databricks_dag.py
następującej zawartości:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Zastąp
JOB_ID
ciąg wartością zapisanego wcześniej identyfikatora zadania.Zapisz plik w
airflow/dags
katalogu. Funkcja Airflow automatycznie odczytuje i instaluje pliki DAG przechowywane w programieairflow/dags/
.
Instalowanie i weryfikowanie grupy DAG w rozwiązaniu Airflow
Aby wyzwolić i zweryfikować grupę DAG w interfejsie użytkownika przepływu powietrza:
- W oknie przeglądarki otwórz plik
http://localhost:8080/home
. Zostanie wyświetlony ekran Grupy DAG przepływu powietrza. - Znajdź
databricks_dag
i kliknij przełącznik Pause/Unpause DAG, aby usunąć grupę DAG . - Wyzwól grupę DAG, klikając przycisk Wyzwalaj grupę DAG .
- Kliknij przebieg w kolumnie Uruchomienia , aby wyświetlić stan i szczegóły przebiegu.