Sdílet prostřednictvím


Orchestrace úloh Azure Databricks pomocí Apache Airflow

Tento článek popisuje podporu Apache Airflow pro orchestraci datových kanálů pomocí Azure Databricks, obsahuje pokyny k instalaci a konfiguraci Airflow místně a poskytuje příklad nasazení a spuštění pracovního postupu Azure Databricks pomocí Airflow.

Orchestrace úloh v datovém kanálu

Vývoj a nasazení kanálu pro zpracování dat často vyžaduje správu složitých závislostí mezi úlohami. Kanál může například číst data ze zdroje, vyčistit data, transformovat vyčištěná data a zapsat transformovaná data do cíle. Potřebujete také podporu pro testování, plánování a řešení chyb při zprovoznění kanálu.

Systémy pracovních postupů tyto problémy řeší tím, že umožňují definovat závislosti mezi úlohami, naplánovat, kdy se kanály spouštějí a monitorují pracovní postupy. Apache Airflow je opensourcové řešení pro správu a plánování datových kanálů. Airflow představuje datové kanály jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Připojení Airflow Azure Databricks umožňuje využívat optimalizovaný modul Spark nabízený službou Azure Databricks s funkcemi plánování Airflow.

Požadavky

  • Integrace mezi Airflow a Azure Databricks vyžaduje Airflow verze 2.5.0 a novější. Příklady v tomto článku jsou testovány s Airflow verze 2.6.1.
  • Airflow vyžaduje Python 3.8, 3.9, 3.10 nebo 3.11. Příklady v tomto článku jsou testovány v Pythonu 3.8.
  • Pokyny v tomto článku k instalaci a spuštění Airflow vyžadují pipenv k vytvoření virtuálního prostředí Pythonu.

Operátoři airflow pro Databricks

Airflow DAG se skládá z úkolů, where každý úkol spouští Airflow Operátor. Operátory airflow podporující integraci do Databricks jsou implementované ve zprostředkovateli Databricks.

Poskytovatel Databricks zahrnuje operátory pro spuštění několika úloh v pracovním prostoru Azure Databricks, včetně importu dat do table, spouštění dotazů SQLa práce s složkami Git Databricks.

Zprostředkovatel Databricks implementuje dva operátory pro aktivaci úloh:

  • DatabricksRunNowOperator vyžaduje existující úlohu Azure Databricks a k aktivaci spuštění používá požadavek ROZHRANÍ API POST /api/2.1/jobs/run-now. Databricks doporučuje použít, DatabricksRunNowOperator protože snižuje duplicitu definic úloh a spuštění úloh aktivovaná tímto operátorem najdete v uživatelském rozhraní úloh.
  • DatabricksSubmitRunOperator nevyžaduje, aby v Azure Databricks existovala úloha a používá post /api/2.1/jobs/run/submit API požadavek k odeslání specifikace úlohy a aktivaci spuštění.

Pokud chcete vytvořit novou úlohu Azure Databricks nebo reset existující úlohu, poskytovatel Databricks implementuje DatabricksCreateJobsOperator. DatabricksCreateJobsOperator používá požadavky rozhraní API POST /api/2.1/jobs/create a POST /api/2.1/jobs/reset. Pomocí této možnosti DatabricksCreateJobsOperatorDatabricksRunNowOperator můžete vytvořit a spustit úlohu.

Poznámka:

Použití operátorů Databricks k aktivaci úlohy vyžaduje poskytnutí credentials v konfiguraci připojení Databricks. Viz Vytvoření tokenu pat Azure Databricks pro Airflow.

Operátoři Airflow Databricks zapisují adresu URL stránky spuštění úlohy do protokolů Airflow každých polling_period_seconds (výchozí hodnota je 30 sekund). Další informace najdete na stránce balíčku apache-airflow-providers-databricks na webu Airflow.

Místní instalace integrace Azure Databricks s Airflow

Pokud chcete nainstalovat Airflow a poskytovatele Databricks místně pro účely testování a vývoje, postupujte následovně. Další možnosti instalace Airflow, včetně vytvoření produkční instalace, najdete v dokumentaci k Airflow.

Otevřete terminál a spusťte následující příkazy:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Nahraďte <firstname>a <lastname><email> zadejte svoje uživatelské jméno a e-mail. Zobrazí se výzva k zadání hesla pro uživatele správce. Nezapomeňte toto heslo uložit, protože je nutné se přihlásit k uživatelskému rozhraní Airflow.

Tento skript provede následující kroky:

  1. Vytvoří adresář pojmenovaný airflow a změní se do daného adresáře.
  2. Používá pipenv se k vytvoření a vytvoření virtuálního prostředí Pythonu. Databricks doporučuje používat virtuální prostředí Pythonu k izolaci verzí balíčků a závislostí kódu do daného prostředí. Tato izolace pomáhá snížit neočekávané neshody verzí balíčků a kolize závislostí kódu.
  3. Inicializuje proměnnou prostředí s názvem AIRFLOW_HOMEset na cestu k adresáři airflow.
  4. Nainstaluje Airflow a balíčky poskytovatele Databricks Airflow.
  5. airflow/dags Vytvoří adresář. Airflow používá dags adresář k ukládání definic DAG.
  6. Inicializuje databázi SQLite, kterou Airflow používá ke sledování metadat. V produkčním nasazení Airflow byste nakonfigurovali Airflow se standardní databází. Databáze SQLite a výchozí konfigurace pro nasazení Airflow se inicializují v airflow adresáři.
  7. Vytvoří uživatele správce pro Airflow.

Tip

Pokud chcete potvrdit instalaci poskytovatele Databricks, spusťte v instalačním adresáři Airflow následující příkaz:

airflow providers list

Spuštění webového serveru a plánovače Airflow

Webový server Airflow se vyžaduje k zobrazení uživatelského rozhraní Airflow. Pokud chcete spustit webový server, otevřete terminál v instalačním adresáři Airflow a spusťte následující příkazy:

Poznámka:

Pokud se webovému serveru Airflow nepodaří spustit kvůli konfliktu portů, můžete změnit výchozí port v konfiguraci Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Plánovač je komponenta Airflow, která plánuje DAG. Pokud chcete spustit plánovač, otevřete nový terminál v instalačním adresáři Airflow a spusťte následující příkazy:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Otestování instalace Airflow

Pokud chcete ověřit instalaci Airflow, můžete spustit jednu z ukázkových DAG, které jsou součástí Airflow:

  1. V prohlížeči windowotevřete http://localhost:8080/home. Přihlaste se k uživatelskému rozhraní Airflow pomocí uživatelského jména a hesla, které jste vytvořili při instalaci Airflow. Zobrazí se stránka DAG airflow.
  2. Kliknutím na přepínač Pozastavit/Zrušit zapůjení daG odblokujte některou z ukázkových dag, například .example_python_operator
  3. Spusťte ukázku DAG kliknutím na tlačítko DaG triggeru.
  4. Kliknutím na název DAG zobrazíte podrobnosti, včetně stavu spuštění dag.

Vytvoření tokenu pat pro Azure Databricks pro Airflow

Airflow se připojuje k Databricks pomocí tokenu PAT (Personal Access Token) Azure Databricks. Pokud chcete vytvořit pat, postupujte podle kroků v tokenech pat pro uživatele pracovního prostoru v Azure Databricks.

Poznámka:

Osvědčeným postupem při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks místo uživatelů pracovního prostoru používat tokeny patního přístupu, které patří instančním objektům . Pokud chcete vytvořit tokeny pro instanční objekty, přečtěte si téma Správa tokenů instančního objektu.

Můžete se také ověřit v Azure Databricks pomocí tokenu ID Microsoft Entra. Viz dokumentace k Připojení Databricks v dokumentaci k Airflow.

Konfigurace připojení Azure Databricks

Instalace Airflow obsahuje výchozí připojení pro Azure Databricks. Pokud chcete update připojení pro připojení k pracovnímu prostoru pomocí tokenu PAT, který jste vytvořili výše:

  1. V prohlížeči windowotevřete http://localhost:8080/connection/list/. Pokud se zobrazí výzva k přihlášení, zadejte uživatelské jméno a heslo správce.
  2. V části Conn ID vyhledejte databricks_default a klikněte na tlačítko Upravit záznam.
  3. Hodnotu v poli Hostitel nahraďte názvem instance pracovního prostoru vašeho nasazení Azure Databricks, https://adb-123456789.cloud.databricks.comnapříklad .
  4. Do pole Heslo zadejte osobní přístupový token Azure Databricks.
  5. Klikněte na Uložit.

Pokud používáte token Microsoft Entra ID, informace o konfiguraci ověřování najdete v dokumentaci ke službě Airflow v tématu Připojení Databricks.

Příklad: Vytvoření DAG Airflow pro spuštění úlohy Azure Databricks

Následující příklad ukazuje, jak vytvořit jednoduché nasazení Airflow, které běží na místním počítači, a nasadí ukázkovou sadu DAG pro aktivaci spuštění v Azure Databricks. V tomto příkladu:

  1. Vytvořte nový poznámkový blok a přidejte kód pro tisk pozdravu na základě nakonfigurovaného parametru.
  2. Vytvořte úlohu Azure Databricks s jednou úlohou, která spustí poznámkový blok.
  3. Nakonfigurujte připojení Airflow k pracovnímu prostoru Azure Databricks.
  4. Vytvořte DAG Airflow, který aktivuje úlohu poznámkového bloku. DaG definujete ve skriptu Pythonu pomocí DatabricksRunNowOperator.
  5. Pomocí uživatelského rozhraní Airflow aktivujte DAG a zobrazte stav spuštění.

Vytvoření poznámkového bloku

V tomto příkladu se používá poznámkový blok obsahující dvě buňky:

  • První buňka obsahuje textový widget nástroje Databricks definující proměnnou pojmenovanou greetingset s výchozí hodnotou world.
  • Druhá buňka vytiskne hodnotu greeting proměnné s předponou hello.

Vytvoření poznámkového bloku:

  1. Přejděte do pracovního prostoru Azure Databricks, klikněte na Nová ikonaNový na bočním panelu a selectpoznámkový blok.

  2. Dejte vašemu notebooku jméno, například Hello Airflow, a ujistěte se, že výchozím jazykem je set až Python.

  3. Zkopírujte následující kód Pythonu a vložte ho do první buňky poznámkového bloku.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Přidejte novou buňku pod první buňku a zkopírujte do nové buňky následující kód Pythonu:

    print("hello {}".format(greeting))
    

Vytvoření úlohy

  1. Na bočním panelu klikněte na Ikona Pracovních postupůPracovní postupy.

  2. Klikněte na Tlačítko Vytvořit úlohu.

    Zobrazí se karta Úkoly s dialogovým oknem vytvořit úkol.

    Dialogové okno Vytvořit první úkol

  3. Nahraďte název vaší úlohy... názvem vaší úlohy.

  4. Do pole Název úkolu zadejte název úkolu, například pozdrav-úkol.

  5. V rozevírací nabídce TypselectNotebook.

  6. V rozevírací nabídce SourceselectWorkspace.

  7. Klikněte na textové pole Cesta a v prohlížeči souborů vyhledejte poznámkový blok, který jste vytvořili, klikněte na název poznámkového bloku a klikněte na Potvrdit.

  8. Klikněte na Přidat pod Parameters. Do pole Klíč zadejte greeting. Do pole Hodnota zadejte Airflow user.

  9. Klikněte na Vytvořit úkol.

Na panelu Podrobnosti úlohy zkopírujte hodnotu ID úlohy. Tato hodnota se vyžaduje k aktivaci úlohy z Airflow.

Spuštění úlohy

Pokud chcete novou úlohu otestovat v uživatelském rozhraní úloh Azure Databricks, klikněte Tlačítko Spustit nyní v pravém horním rohu. Po dokončení spuštění můžete výstup ověřit zobrazením podrobností o spuštění úlohy.

Vytvoření nového DAG Airflow

V souboru Pythonu definujete DAG Airflow. Vytvoření DAG pro aktivaci ukázkové úlohy poznámkového bloku:

  1. V textovém editoru nebo integrovaném vývojovém prostředí vytvořte nový soubor s názvem databricks_dag.py s následujícím obsahem:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Nahraďte JOB_ID hodnotou ID úlohy, které jste uložili dříve.

  2. Uložte soubor do airflow/dags adresáře. Airflow automaticky čte a instaluje soubory DAG uložené v airflow/dags/.

Instalace a ověření DAG v Airflow

Aktivace a ověření DAG v uživatelském rozhraní Airflow:

  1. V prohlížeči windowotevřete http://localhost:8080/home. Zobrazí se obrazovka DAG Airflow.
  2. Vyhledejte databricks_dag a klikněte na přepínač Pozastavit nebo zrušit pozastavení DAG a zrušte tak pozastavení DAG .
  3. Spusťte DAG kliknutím na tlačítko DaG triggeru.
  4. Kliknutím na běh v Běhycolumn k zobrazení stavu a podrobností běhu.