Sdílet prostřednictvím


Spustit DLT pipeline v pracovním postupu

Kanál DLT můžete spustit jako součást pracovního postupu zpracování dat pomocí úloh Databricks, Apache Airflow nebo Azure Data Factory.

Pracovní místa

V úloze Databricks můžete orchestrovat více úloh za účelem implementace pracovního postupu zpracování dat. Pokud chcete do úlohy zahrnout kanál DLT, při vytváření úlohy použijte úlohu Pipeline. Viz úloha kanálu DLT pro úlohy.

Apache Airflow

Apache Airflow je opensourcové řešení pro správu a plánování pracovních postupů dat. Airflow představuje pracovní postupy jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Informace o instalaci a používání Airflow s Azure Databricks najdete v tématu Orchestrace úloh Azure Databricks pomocí apache Airflow.

Pro spuštění DLT kanálu jako součásti pracovního postupu Airflow použijte DatabricksSubmitRunOperator.

Požadavky

K použití podpory Airflow pro DLT jsou potřeba následující:

  • Airflow verze 2.1.0 nebo novější.
  • Poskytovatel balíčku Databricks verze 2.1.0 nebo novější.

Příklad

Následující příklad vytvoří DAG Airflow, který aktivuje aktualizaci kanálu DLT s identifikátorem 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Nahraďte CONNECTION_ID identifikátorem připojení Airflow k vašemu pracovnímu prostoru.

Uložte tento příklad do adresáře airflow/dags a využijte uživatelské rozhraní Airflow k zobrazení a spuštění DAG. K zobrazení podrobností o aktualizaci pipeline použijte uživatelské rozhraní DLT.

Azure Data Factory

Poznámka

Knihovna DLT a Azure Data Factory zahrnují možnosti konfigurace počtu opakování v případě selhání. Pokud jsou hodnoty opakování nakonfigurované v kanálu DLT a v aktivitě služby Azure Data Factory, která kanál volá, je počet opakování služby Azure Data Factory vynásobenou hodnotou opakování DLT.

Pokud například aktualizace kanálu selže, DLT ve výchozím nastavení opakuje aktualizaci až pětkrát. Pokud je opakování služby Azure Data Factory nastavené na tři a váš kanál DLT používá výchozí nastavení pěti opakování, může se neúspěšný kanál DLT opakovat až patnáctkrát. Aby nedocházelo k nadměrným pokusům o opakování při selhání aktualizací kanálu, doporučuje Databricks omezit počet opakování při konfiguraci kanálu DLT nebo aktivity služby Azure Data Factory, která kanál volá.

Pokud chcete změnit konfiguraci opakování kanálu DLT, při konfiguraci kanálu použijte nastavení pipelines.numUpdateRetryAttempts.

Azure Data Factory je cloudová služba ETL, která umožňuje orchestraci pracovních postupů integrace a transformace dat. Azure Data Factory přímo podporuje spouštění úloh Azure Databricks v pracovním postupu, včetně poznámkových bloků, úloh JAR a skriptů Pythonu. Kanál můžete zahrnout také do pracovního postupu voláním rozhraní API DLT z webové aktivity služby Azure Data Factory. Pokud například chcete spustit aktualizaci pipeline ze služby Azure Data Factory:

  1. Vytvoření datové továrny nebo otevření existující datové továrny.

  2. Po dokončení procesu vytváření otevřete stránku vaší datové továrny a klikněte na dlaždici Otevřít Azure Data Factory Studio. Zobrazí se uživatelské rozhraní služby Azure Data Factory.

  3. Vytvořte nové potrubí služby Azure Data Factory tak, že v uživatelském rozhraní Azure Data Factory Studio vyberete Pipeline z rozevírací nabídky Nový.

  4. Na panelu nástrojů Aktivity rozbalte Obecné a přetáhněte aktivitu Web na plátno kanálu. Klikněte na kartu Nastavení a zadejte následující hodnoty:

    Poznámka

    Osvědčeným postupem při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks používat osobní přístupové tokeny, které patří instančním objektům místo uživatelů pracovního prostoru. Informace o vytváření tokenů pro instanční objekty najdete v tématu Správa tokenů pro instanční objekt.

    • adresa URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Nahraďte <get-workspace-instance>.

      Nahraďte <pipeline-id> identifikátorem kanálu.

    • Metoda: V rozevírací nabídce vyberte POST.

    • záhlaví: Klikněte na + Nový. Do textového pole Název zadejte Authorization. Do textového pole Hodnota zadejte Bearer <personal-access-token>.

      Nahraďte <personal-access-token> osobním přístupovým tokenem Azure Databricks.

    • Část: Pokud chcete předat další parametry požadavku, zadejte dokument JSON obsahující parametry. Chcete-li například spustit aktualizaci a znovu zpracovat všechna data pro kanál: {"full_refresh": "true"}. Pokud neexistují žádné další parametry požadavku, zadejte prázdné složené závorky ({}).

Chcete-li otestovat webovou aktivitu, klikněte na Ladění na panelu nástrojů datového kanálu v uživatelském rozhraní služby Data Factory. Výstup a stav spuštění, včetně chyb, se zobrazují na kartě Výstup v pipelině Azure Data Factory. Pomocí uživatelského rozhraní DLT zobrazte podrobnosti o aktualizaci datového kanálu.

Spropitné

Běžným požadavkem pracovního postupu je zahájení úkolu po dokončení předchozího úkolu. Vzhledem k tomu, že požadavek DLT updates je asynchronní – požadavek se vrátí po spuštění aktualizace, ale před dokončením aktualizace – úlohy v kanálu služby Azure Data Factory se závislostí na aktualizaci DLT musí počkat na dokončení aktualizace. Možnost čekat na dokončení aktualizace znamená přidání aktivity Until za webovou aktivitu, která spouští aktualizaci DLT. V aktivitě Until:

  1. Přidejte aktivitu Wait, aby po dokončení aktualizace čekala nakonfigurovaný počet sekund.
  2. Přidejte webovou aktivitu za aktivitou Čekání, která používá požadavek na podrobnosti aktualizace DLT k získání stavu aktualizace. Pole state v odpovědi vrátí aktuální stav aktualizace, včetně toho, jestli byla dokončena.
  3. Pomocí hodnoty pole state nastavte ukončovací podmínku aktivity Until. Můžete také použít aktivitu Nastavit proměnnou k přidání proměnné potrubí na základě hodnoty state a tuto proměnnou použít pro ukončovací podmínku.