Azure Databricks-taken organiseren met Apache Airflow
In dit artikel wordt de Apache Airflow-ondersteuning beschreven voor het organiseren van gegevenspijplijnen met Azure Databricks, bevat instructies voor het lokaal installeren en configureren van Airflow en biedt een voorbeeld van het implementeren en uitvoeren van een Azure Databricks-werkstroom met Airflow.
Taakindeling in een gegevenspijplijn
Het ontwikkelen en implementeren van een pijplijn voor gegevensverwerking vereist vaak het beheren van complexe afhankelijkheden tussen taken. Een pijplijn kan bijvoorbeeld gegevens lezen uit een bron, de gegevens opschonen, de opgeschoonde gegevens transformeren en de getransformeerde gegevens naar een doel schrijven. U hebt ook ondersteuning nodig voor het testen, plannen en oplossen van fouten bij het operationeel maken van een pijplijn.
Werkstroomsystemen pakken deze uitdagingen aan door u in staat te stellen afhankelijkheden tussen taken te definiëren, te plannen wanneer pijplijnen worden uitgevoerd en werkstromen te bewaken. Apache Airflow is een opensource-oplossing voor het beheren en plannen van gegevenspijplijnen. Airflow vertegenwoordigt gegevenspijplijnen als omgeleide acyclische grafieken (DAG's) van bewerkingen. U definieert een werkstroom in een Python-bestand en Airflow beheert de planning en uitvoering. Met de Airflow Azure Databricks-verbinding kunt u profiteren van de geoptimaliseerde Spark-engine die door Azure Databricks wordt aangeboden met de planningsfuncties van Airflow.
Vereisten
- Voor de integratie tussen Airflow en Azure Databricks is Airflow versie 2.5.0 en hoger vereist. De voorbeelden in dit artikel worden getest met Airflow versie 2.6.1.
- Airflow vereist Python 3.8, 3.9, 3.10 of 3.11. De voorbeelden in dit artikel worden getest met Python 3.8.
- Voor de instructies in dit artikel voor het installeren en uitvoeren van Airflow is pipenv vereist om een virtuele Python-omgeving te maken.
Airflow-operators voor Databricks
Een Airflow DAG bestaat uit taken, where elke taak een Airflow Operatoruitvoert. Airflow-operators die de integratie met Databricks ondersteunen, worden geïmplementeerd in de Databricks-provider.
De Databricks-provider bevat operators voor het uitvoeren van een aantal taken voor een Azure Databricks-werkruimte, waaronder het importeren van gegevens in een table, het uitvoeren van SQL-query'sen het werken met Databricks Git-mappen.
De Databricks-provider implementeert twee operators voor het activeren van taken:
- De DatabricksRunNowOperator vereist een bestaande Azure Databricks-taak en gebruikt de POST/api/2.1/jobs/run-now API-aanvraag om een uitvoering te activeren. Databricks raadt aan het gebruik van de
DatabricksRunNowOperator
functie te gebruiken, omdat het duplicatie van taakdefinities vermindert en taakuitvoeringen die met deze operator worden geactiveerd, kunnen worden gevonden in de gebruikersinterface van Taken. - De DatabricksSubmitRunOperator vereist geen taak in Azure Databricks en gebruikt de POST /api/2.1/jobs/runs/submit API-aanvraag om de taakspecificatie in te dienen en een uitvoering te activeren.
Als u een nieuwe Azure Databricks-taak of reset een bestaande taak wilt maken, implementeert de Databricks-provider de DatabricksCreateJobsOperator. De DatabricksCreateJobsOperator
gebruikt de POST /api/2.1/jobs/create en POST /api/2.1/jobs/reset API-aanvragen. U kunt de DatabricksCreateJobsOperator
functie gebruiken om DatabricksRunNowOperator
een taak te maken en uit te voeren.
Notitie
Als u de Databricks-operators gebruikt om een taak te activeren, moet u credentials bieden in de configuratie van de Databricks-verbinding. Zie Een persoonlijk toegangstoken voor Azure Databricks maken voor Airflow.
De Databricks Airflow-operators schrijven de URL van de taakuitvoeringspagina naar de Airflow-logboeken elke polling_period_seconds
(de standaardwaarde is 30 seconden). Zie de Apache-Airflow-providers-Databricks-pakketpagina op de Airflow-website voor meer informatie.
De integratie van Airflow Azure Databricks lokaal installeren
Gebruik de volgende stappen om Airflow en de Databricks-provider lokaal te installeren voor testen en ontwikkelen. Zie de installatie in de Airflow-documentatie voor andere installatieopties, waaronder het maken van een productie-installatie.
Open een terminal en voer de volgende opdrachten uit:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Vervang <firstname>
, <lastname>
en <email>
door uw gebruikersnaam en e-mailadres. U wordt gevraagd een wachtwoord in te voeren voor de gebruiker met beheerdersrechten. Zorg ervoor dat u dit wachtwoord opslaat omdat het is vereist om u aan te melden bij de gebruikersinterface van Airflow.
Met dit script worden de volgende stappen uitgevoerd:
- Hiermee maakt u een map met de naam
airflow
en wijzigingen in die map. - Hiermee
pipenv
maakt en spawt u een virtuele Python-omgeving. Databricks raadt aan om een virtuele Python-omgeving te gebruiken om pakketversies en codeafhankelijkheden voor die omgeving te isoleren. Deze isolatie helpt onverwachte niet-overeenkomende pakketversies en codeafhankelijkheidsconflicten te verminderen. - Initialiseert een omgevingsvariabele met de naam
AIRFLOW_HOME
set met het pad van deairflow
directory. - Installeert Airflow en de Airflow Databricks-providerpakketten.
- Hiermee maakt u een
airflow/dags
map. Airflow gebruikt dedags
map om DAG-definities op te slaan. - Initialiseert een SQLite-database die Door Airflow wordt gebruikt om metagegevens bij te houden. In een productie-Airflow-implementatie configureert u Airflow met een standaarddatabase. De SQLite-database en de standaardconfiguratie voor uw Airflow-implementatie worden geïnitialiseerd in de
airflow
map. - Hiermee maakt u een beheerdergebruiker voor Airflow.
Tip
Voer de volgende opdracht uit in de installatiemap van Airflow om de installatie van de Databricks-provider te bevestigen:
airflow providers list
De Airflow-webserver en planner starten
De Airflow-webserver is vereist om de gebruikersinterface van Airflow weer te geven. Als u de webserver wilt starten, opent u een terminal in de installatiemap van Airflow en voert u de volgende opdrachten uit:
Notitie
Als de Airflow-webserver niet kan worden gestart vanwege een poortconflict, kunt u de standaardpoort in de Airflow-configuratie wijzigen.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
De scheduler is het Airflow-onderdeel waarmee DAG's worden gepland. Als u de scheduler wilt starten, opent u een nieuwe terminal in de installatiemap van Airflow en voert u de volgende opdrachten uit:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
De airflow-installatie testen
Als u de Airflow-installatie wilt controleren, kunt u een van de voorbeeld-DAG's uitvoeren die zijn opgenomen in Airflow:
- Open
http://localhost:8080/home
in een browser window. Meld u aan bij de gebruikersinterface van Airflow met de gebruikersnaam en het wachtwoord die u hebt gemaakt bij de installatie van Airflow. De pagina Airflow-DAG's wordt weergegeven. - Klik op de wisselknop DAG onderbreken/opheffen om een van de voorbeeld-DAG's, bijvoorbeeld de
example_python_operator
. - Activeer het voorbeeld van DAG door te klikken op de knop DAG activeren.
- Klik op de DAG-naam om details weer te geven, inclusief de uitvoeringsstatus van de DAG.
Een persoonlijk toegangstoken voor Azure Databricks maken voor Airflow
Airflow maakt verbinding met Databricks met behulp van een persoonlijk toegangstoken (PAT) van Azure Databricks. Als u een PAT wilt maken, volgt u de stappen in persoonlijke toegangstokens van Azure Databricks voor werkruimtegebruikers.
Notitie
Als best practice voor beveiliging, wanneer u zich verifieert met geautomatiseerde hulpprogramma's, systemen, scripts en apps, raadt Databricks u aan om persoonlijke toegangstokens te gebruiken die behoren tot service-principals in plaats van werkruimtegebruikers. Zie Tokens voor een service-principal beheren om tokens voor service-principals te maken.
U kunt zich ook verifiëren bij Azure Databricks met behulp van een Microsoft Entra ID-token. Zie Databricks Connection in de airflow-documentatie.
Een Azure Databricks-verbinding configureren
Uw Airflow-installatie bevat een standaardverbinding voor Azure Databricks. Gebruik het persoonlijke toegangstoken dat u hierboven hebt gemaakt om verbinding te maken met uw werkruimte via de update verbinding:
- In een browser window, open
http://localhost:8080/connection/list/
. Als u wordt gevraagd u aan te melden, voert u de gebruikersnaam en het wachtwoord van uw beheerder in. - Zoek onder Conn-id databricks_default en klik op de knop Record bewerken.
- Vervang de waarde in het veld Host door de naam van het werkruimte-exemplaar van uw Azure Databricks-implementatie,
https://adb-123456789.cloud.databricks.com
bijvoorbeeld. - Voer in het veld Wachtwoord uw persoonlijke toegangstoken van Azure Databricks in.
- Klik op Opslaan.
Als u een Microsoft Entra ID-token gebruikt, raadpleegt u Databricks Connection in de Airflow-documentatie voor informatie over het configureren van verificatie.
Voorbeeld: Een Airflow DAG maken om een Azure Databricks-taak uit te voeren
In het volgende voorbeeld ziet u hoe u een eenvoudige Airflow-implementatie maakt die wordt uitgevoerd op uw lokale computer en hoe u een voorbeeld-DAG implementeert om uitvoeringen in Azure Databricks te activeren. In dit voorbeeld gaat u het volgende doen:
- Maak een nieuw notitieblok en voeg code toe om een begroeting af te drukken op basis van een geconfigureerde parameter.
- Maak een Azure Databricks-taak met één taak waarmee het notebook wordt uitgevoerd.
- Configureer een Airflow-verbinding met uw Azure Databricks-werkruimte.
- Maak een Airflow DAG om de notebooktaak te activeren. U definieert de DAG in een Python-script met behulp van
DatabricksRunNowOperator
. - Gebruik de Airflow-gebruikersinterface om de DAG te activeren en de uitvoeringsstatus weer te geven.
Een notebook maken
In dit voorbeeld wordt een notebook met twee cellen gebruikt:
- De eerste cel bevat een tekstwidget voor Databricks Utilities het definiëren van een variabele met de naam
greeting
set de standaardwaardeworld
. - In de tweede cel wordt de waarde afgedrukt van de
greeting
variabele die wordt voorafgegaan doorhello
.
Ga als volgende te werk om het notebook te maken:
Ga naar uw Azure Databricks-werkruimte, klik op Nieuwe in de zijbalk en selectNotebook.
Geef uw notitieblok een naam, zoals
Hello Airflow- , en zorg ervoor dat de standaardtaal isom Python-te . Kopieer de volgende Python-code en plak deze in de eerste cel van het notebook.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Voeg een nieuwe cel toe onder de eerste cel en kopieer en plak de volgende Python-code in de nieuwe cel:
print("hello {}".format(greeting))
Een taak maken
Klik op Werkstromen in de zijbalk.
Klik op .
Het tabblad Taken wordt weergegeven met het dialoogvenster Taak maken.
Vervang Een naam voor uw taak toevoegen... door uw taaknaam.
Voer in het veld Taaknaam een naam in voor de taak, bijvoorbeeld begroetingstaak.
In de vervolgkeuzelijst Type, selectNotebook.
In de vervolgkeuzelijst Bron, selectWerkruimte.
Klik op het tekstvak Pad en gebruik de bestandsbrowser om het notitieblok te zoeken dat u hebt gemaakt, klik op de naam van het notitieblok en klik op Bevestigen.
Klik op Toevoegen onder Parameters. Voer in het veld Sleutel de waarde in
greeting
. Voer in het veld WaardeAirflow user
in.Klik op Taak maken.
Kopieer in het deelvenster Taakdetails de waarde van de taak-id . Deze waarde is vereist om de taak vanuit Airflow te activeren.
De taak uitvoeren
Als u uw nieuwe taak wilt testen in de gebruikersinterface van Azure Databricks-taken, klikt u in de rechterbovenhoek. Wanneer de uitvoering is voltooid, kunt u de uitvoer controleren door de details van de taakuitvoering weer te geven.
Een nieuwe Airflow DAG maken
U definieert een Airflow DAG in een Python-bestand. Een DAG maken om de voorbeeldnotitiebloktaak te activeren:
Maak in een teksteditor of IDE een nieuw bestand met de naam
databricks_dag.py
met de volgende inhoud:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Vervang door
JOB_ID
de waarde van de taak-id die u eerder hebt opgeslagen.Sla het bestand op in de
airflow/dags
map. Airflow leest en installeert AUTOMATISCH DAG-bestanden die zijn opgeslagen inairflow/dags/
.
De DAG in Airflow installeren en controleren
De DAG activeren en verifiëren in de gebruikersinterface van Airflow:
- Open
http://localhost:8080/home
in een browser window. Het scherm Airflow DAG's wordt weergegeven. - Zoek
databricks_dag
en klik op de wisselknop DAG onderbreken/opheffen om de DAG ongedaan te maken. - Activeer dag door op de knop DAG activeren te klikken.
- Klik op een run in de Runscolumn om de status en details van de run weer te geven.