Delen via


Azure Databricks-taken organiseren met Apache Airflow

In dit artikel wordt de Apache Airflow-ondersteuning beschreven voor het organiseren van gegevenspijplijnen met Azure Databricks, bevat instructies voor het lokaal installeren en configureren van Airflow en biedt een voorbeeld van het implementeren en uitvoeren van een Azure Databricks-werkstroom met Airflow.

Taakindeling in een gegevenspijplijn

Het ontwikkelen en implementeren van een pijplijn voor gegevensverwerking vereist vaak het beheren van complexe afhankelijkheden tussen taken. Een pijplijn kan bijvoorbeeld gegevens lezen uit een bron, de gegevens opschonen, de opgeschoonde gegevens transformeren en de getransformeerde gegevens naar een doel schrijven. U hebt ook ondersteuning nodig voor het testen, plannen en oplossen van fouten bij het operationeel maken van een pijplijn.

Werkstroomsystemen pakken deze uitdagingen aan door u in staat te stellen afhankelijkheden tussen taken te definiëren, te plannen wanneer pijplijnen worden uitgevoerd en werkstromen te bewaken. Apache Airflow is een opensource-oplossing voor het beheren en plannen van gegevenspijplijnen. Airflow vertegenwoordigt gegevenspijplijnen als omgeleide acyclische grafieken (DAG's) van bewerkingen. U definieert een werkstroom in een Python-bestand en Airflow beheert de planning en uitvoering. Met de Airflow Azure Databricks-verbinding kunt u profiteren van de geoptimaliseerde Spark-engine die door Azure Databricks wordt aangeboden met de planningsfuncties van Airflow.

Vereisten

  • Voor de integratie tussen Airflow en Azure Databricks is Airflow versie 2.5.0 en hoger vereist. De voorbeelden in dit artikel worden getest met Airflow versie 2.6.1.
  • Airflow vereist Python 3.8, 3.9, 3.10 of 3.11. De voorbeelden in dit artikel worden getest met Python 3.8.
  • Voor de instructies in dit artikel voor het installeren en uitvoeren van Airflow is pipenv vereist om een virtuele Python-omgeving te maken.

Airflow-operators voor Databricks

Een Airflow DAG bestaat uit taken, where elke taak een Airflow Operatoruitvoert. Airflow-operators die de integratie met Databricks ondersteunen, worden geïmplementeerd in de Databricks-provider.

De Databricks-provider bevat operators voor het uitvoeren van een aantal taken voor een Azure Databricks-werkruimte, waaronder het importeren van gegevens in een table, het uitvoeren van SQL-query'sen het werken met Databricks Git-mappen.

De Databricks-provider implementeert twee operators voor het activeren van taken:

Als u een nieuwe Azure Databricks-taak of reset een bestaande taak wilt maken, implementeert de Databricks-provider de DatabricksCreateJobsOperator. De DatabricksCreateJobsOperator gebruikt de POST /api/2.1/jobs/create en POST /api/2.1/jobs/reset API-aanvragen. U kunt de DatabricksCreateJobsOperator functie gebruiken om DatabricksRunNowOperator een taak te maken en uit te voeren.

Notitie

Als u de Databricks-operators gebruikt om een taak te activeren, moet u credentials bieden in de configuratie van de Databricks-verbinding. Zie Een persoonlijk toegangstoken voor Azure Databricks maken voor Airflow.

De Databricks Airflow-operators schrijven de URL van de taakuitvoeringspagina naar de Airflow-logboeken elke polling_period_seconds (de standaardwaarde is 30 seconden). Zie de Apache-Airflow-providers-Databricks-pakketpagina op de Airflow-website voor meer informatie.

De integratie van Airflow Azure Databricks lokaal installeren

Gebruik de volgende stappen om Airflow en de Databricks-provider lokaal te installeren voor testen en ontwikkelen. Zie de installatie in de Airflow-documentatie voor andere installatieopties, waaronder het maken van een productie-installatie.

Open een terminal en voer de volgende opdrachten uit:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Vervang <firstname>, <lastname>en <email> door uw gebruikersnaam en e-mailadres. U wordt gevraagd een wachtwoord in te voeren voor de gebruiker met beheerdersrechten. Zorg ervoor dat u dit wachtwoord opslaat omdat het is vereist om u aan te melden bij de gebruikersinterface van Airflow.

Met dit script worden de volgende stappen uitgevoerd:

  1. Hiermee maakt u een map met de naam airflow en wijzigingen in die map.
  2. Hiermee pipenv maakt en spawt u een virtuele Python-omgeving. Databricks raadt aan om een virtuele Python-omgeving te gebruiken om pakketversies en codeafhankelijkheden voor die omgeving te isoleren. Deze isolatie helpt onverwachte niet-overeenkomende pakketversies en codeafhankelijkheidsconflicten te verminderen.
  3. Initialiseert een omgevingsvariabele met de naam AIRFLOW_HOMEset met het pad van de airflow directory.
  4. Installeert Airflow en de Airflow Databricks-providerpakketten.
  5. Hiermee maakt u een airflow/dags map. Airflow gebruikt de dags map om DAG-definities op te slaan.
  6. Initialiseert een SQLite-database die Door Airflow wordt gebruikt om metagegevens bij te houden. In een productie-Airflow-implementatie configureert u Airflow met een standaarddatabase. De SQLite-database en de standaardconfiguratie voor uw Airflow-implementatie worden geïnitialiseerd in de airflow map.
  7. Hiermee maakt u een beheerdergebruiker voor Airflow.

Tip

Voer de volgende opdracht uit in de installatiemap van Airflow om de installatie van de Databricks-provider te bevestigen:

airflow providers list

De Airflow-webserver en planner starten

De Airflow-webserver is vereist om de gebruikersinterface van Airflow weer te geven. Als u de webserver wilt starten, opent u een terminal in de installatiemap van Airflow en voert u de volgende opdrachten uit:

Notitie

Als de Airflow-webserver niet kan worden gestart vanwege een poortconflict, kunt u de standaardpoort in de Airflow-configuratie wijzigen.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

De scheduler is het Airflow-onderdeel waarmee DAG's worden gepland. Als u de scheduler wilt starten, opent u een nieuwe terminal in de installatiemap van Airflow en voert u de volgende opdrachten uit:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

De airflow-installatie testen

Als u de Airflow-installatie wilt controleren, kunt u een van de voorbeeld-DAG's uitvoeren die zijn opgenomen in Airflow:

  1. Open http://localhost:8080/homein een browser window. Meld u aan bij de gebruikersinterface van Airflow met de gebruikersnaam en het wachtwoord die u hebt gemaakt bij de installatie van Airflow. De pagina Airflow-DAG's wordt weergegeven.
  2. Klik op de wisselknop DAG onderbreken/opheffen om een van de voorbeeld-DAG's, bijvoorbeeld de example_python_operator.
  3. Activeer het voorbeeld van DAG door te klikken op de knop DAG activeren.
  4. Klik op de DAG-naam om details weer te geven, inclusief de uitvoeringsstatus van de DAG.

Een persoonlijk toegangstoken voor Azure Databricks maken voor Airflow

Airflow maakt verbinding met Databricks met behulp van een persoonlijk toegangstoken (PAT) van Azure Databricks. Als u een PAT wilt maken, volgt u de stappen in persoonlijke toegangstokens van Azure Databricks voor werkruimtegebruikers.

Notitie

Als best practice voor beveiliging, wanneer u zich verifieert met geautomatiseerde hulpprogramma's, systemen, scripts en apps, raadt Databricks u aan om persoonlijke toegangstokens te gebruiken die behoren tot service-principals in plaats van werkruimtegebruikers. Zie Tokens voor een service-principal beheren om tokens voor service-principals te maken.

U kunt zich ook verifiëren bij Azure Databricks met behulp van een Microsoft Entra ID-token. Zie Databricks Connection in de airflow-documentatie.

Een Azure Databricks-verbinding configureren

Uw Airflow-installatie bevat een standaardverbinding voor Azure Databricks. Gebruik het persoonlijke toegangstoken dat u hierboven hebt gemaakt om verbinding te maken met uw werkruimte via de update verbinding:

  1. In een browser window, open http://localhost:8080/connection/list/. Als u wordt gevraagd u aan te melden, voert u de gebruikersnaam en het wachtwoord van uw beheerder in.
  2. Zoek onder Conn-id databricks_default en klik op de knop Record bewerken.
  3. Vervang de waarde in het veld Host door de naam van het werkruimte-exemplaar van uw Azure Databricks-implementatie, https://adb-123456789.cloud.databricks.combijvoorbeeld.
  4. Voer in het veld Wachtwoord uw persoonlijke toegangstoken van Azure Databricks in.
  5. Klik op Opslaan.

Als u een Microsoft Entra ID-token gebruikt, raadpleegt u Databricks Connection in de Airflow-documentatie voor informatie over het configureren van verificatie.

Voorbeeld: Een Airflow DAG maken om een Azure Databricks-taak uit te voeren

In het volgende voorbeeld ziet u hoe u een eenvoudige Airflow-implementatie maakt die wordt uitgevoerd op uw lokale computer en hoe u een voorbeeld-DAG implementeert om uitvoeringen in Azure Databricks te activeren. In dit voorbeeld gaat u het volgende doen:

  1. Maak een nieuw notitieblok en voeg code toe om een begroeting af te drukken op basis van een geconfigureerde parameter.
  2. Maak een Azure Databricks-taak met één taak waarmee het notebook wordt uitgevoerd.
  3. Configureer een Airflow-verbinding met uw Azure Databricks-werkruimte.
  4. Maak een Airflow DAG om de notebooktaak te activeren. U definieert de DAG in een Python-script met behulp van DatabricksRunNowOperator.
  5. Gebruik de Airflow-gebruikersinterface om de DAG te activeren en de uitvoeringsstatus weer te geven.

Een notebook maken

In dit voorbeeld wordt een notebook met twee cellen gebruikt:

  • De eerste cel bevat een tekstwidget voor Databricks Utilities het definiëren van een variabele met de naam greetingset de standaardwaarde world.
  • In de tweede cel wordt de waarde afgedrukt van de greeting variabele die wordt voorafgegaan door hello.

Ga als volgende te werk om het notebook te maken:

  1. Ga naar uw Azure Databricks-werkruimte, klik op Nieuw pictogramNieuwe in de zijbalk en selectNotebook.

  2. Geef uw notitieblok een naam, zoals Hello Airflow-, en zorg ervoor dat de standaardtaal is om Python-te .

  3. Kopieer de volgende Python-code en plak deze in de eerste cel van het notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Voeg een nieuwe cel toe onder de eerste cel en kopieer en plak de volgende Python-code in de nieuwe cel:

    print("hello {}".format(greeting))
    

Een taak maken

  1. Klik op Pictogram WerkstromenWerkstromen in de zijbalk.

  2. Klik op Knop Taak maken.

    Het tabblad Taken wordt weergegeven met het dialoogvenster Taak maken.

    Dialoogvenster Eerste taak maken

  3. Vervang Een naam voor uw taak toevoegen... door uw taaknaam.

  4. Voer in het veld Taaknaam een naam in voor de taak, bijvoorbeeld begroetingstaak.

  5. In de vervolgkeuzelijst Type, selectNotebook.

  6. In de vervolgkeuzelijst Bron, selectWerkruimte.

  7. Klik op het tekstvak Pad en gebruik de bestandsbrowser om het notitieblok te zoeken dat u hebt gemaakt, klik op de naam van het notitieblok en klik op Bevestigen.

  8. Klik op Toevoegen onder Parameters. Voer in het veld Sleutel de waarde in greeting. Voer in het veld WaardeAirflow user in.

  9. Klik op Taak maken.

Kopieer in het deelvenster Taakdetails de waarde van de taak-id . Deze waarde is vereist om de taak vanuit Airflow te activeren.

De taak uitvoeren

Als u uw nieuwe taak wilt testen in de gebruikersinterface van Azure Databricks-taken, klikt u Knop Nu uitvoeren in de rechterbovenhoek. Wanneer de uitvoering is voltooid, kunt u de uitvoer controleren door de details van de taakuitvoering weer te geven.

Een nieuwe Airflow DAG maken

U definieert een Airflow DAG in een Python-bestand. Een DAG maken om de voorbeeldnotitiebloktaak te activeren:

  1. Maak in een teksteditor of IDE een nieuw bestand met de naam databricks_dag.py met de volgende inhoud:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Vervang door JOB_ID de waarde van de taak-id die u eerder hebt opgeslagen.

  2. Sla het bestand op in de airflow/dags map. Airflow leest en installeert AUTOMATISCH DAG-bestanden die zijn opgeslagen in airflow/dags/.

De DAG in Airflow installeren en controleren

De DAG activeren en verifiëren in de gebruikersinterface van Airflow:

  1. Open http://localhost:8080/homein een browser window. Het scherm Airflow DAG's wordt weergegeven.
  2. Zoek databricks_dag en klik op de wisselknop DAG onderbreken/opheffen om de DAG ongedaan te maken.
  3. Activeer dag door op de knop DAG activeren te klikken.
  4. Klik op een run in de Runscolumn om de status en details van de run weer te geven.