Spuštění kanálu Delta Live Tables v pracovním postupu
Kanál Delta Live Tables můžete spustit jako součást pracovního postupu zpracování dat pomocí úloh Databricks, Apache Airflow nebo Azure Data Factory.
Úlohy
V úloze Databricks můžete orchestrovat více úloh za účelem implementace pracovního postupu zpracování dat. Pokud chcete do úlohy zahrnout pipeline Delta Live Tables, použijte úlohu Pipeline při vytváření úlohy. Vizte úlohu kanálu Delta Live Tables v rámci úloh.
Apache Airflow
Apache Airflow je opensourcové řešení pro správu a plánování pracovních postupů dat. Airflow představuje pracovní postupy jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Informace o instalaci a používání Airflow s Azure Databricks najdete v tématu Orchestrace úloh Azure Databricks pomocí Apache Airflow.
Pokud chcete spustit kanál Delta Live Tables jako součást pracovního postupu Airflow, použijte DatabricksSubmitRunOperator.
Požadavky
K použití podpory Airflow pro Delta Live Tables jsou vyžadovány následující věci:
- Airflow verze 2.1.0 nebo novější.
- Balíček zprostředkovatele Databricks verze 2.1.0 nebo novější.
Příklad
Následující příklad vytvoří DAG Airflow, který spouští aktualizaci pipeline Delta Live Tables s identifikátorem 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Nahraďte CONNECTION_ID
identifikátorem připojení Airflow k vašemu pracovnímu prostoru.
Uložte tento příklad do airflow/dags
adresáře a pomocí uživatelského rozhraní Airflow zobrazte a aktivujte DAG. K zobrazení podrobností o aktualizaci datového kanálu použijte uživatelské rozhraní Delta Live Tables.
Azure Data Factory
Poznámka:
Delta Live Tables a Azure Data Factory zahrnují možnosti konfigurace počtu opakování v případě chyby. Pokud jsou hodnoty opakování nakonfigurovány v kanálu Delta Live Tables a a v aktivitě služby Azure Data Factory, která kanál volá, je počet opakování součinem hodnoty opakování z Azure Data Factory a hodnoty opakování z Delta Live Tables.
Pokud například aktualizace kanálu selže, Delta Live Tables ve výchozím nastavení aktualizaci opakuje až pětkrát. Pokud je opakování služby Azure Data Factory nastaveno na tři a pipeline Delta Live Tables používá výchozí nastavení pěti opakování, může se neúspěšná pipeline Delta Live Tables opakovat až patnáctkrát. Aby nedocházelo k nadměrným pokusům o opakování při selhání aktualizací kanálu, doporučuje Databricks omezit počet opakování při konfiguraci kanálu Delta Live Tables nebo aktivity azure Data Factory, která kanál volá.
Pokud chcete změnit konfiguraci opakování kanálu Delta Live Tables, při konfiguraci kanálu použijte nastavení pipelines.numUpdateRetryAttempts
.
Azure Data Factory je cloudová služba ETL, která umožňuje orchestraci pracovních postupů integrace a transformace dat. Azure Data Factory přímo podporuje spouštění úloh Azure Databricks v pracovním postupu, včetně poznámkových bloků, úloh JAR a skriptů Pythonu. Kanál můžete zahrnout také do pracovního postupu voláním rozhraní API
Vytvořte datovou továrnu nebo otevřete existující datovou továrnu.
Po dokončení vytváření otevřete stránku datové továrny a klikněte na dlaždici Otevřít Azure Data Factory Studio . Zobrazí se uživatelské rozhraní služby Azure Data Factory.
Nový kanál Azure Data Factory vytvoříte tak , že v uživatelském rozhraní azure Data Factory Studio vyberete kanál z rozevírací nabídky Nový .
Na panelu nástrojů Aktivity rozbalte položku Obecné a přetáhněte webovou aktivitu na plátno kanálu. Klikněte na kartu Nastavení a zadejte následující hodnoty:
Poznámka:
Osvědčeným postupem při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks místo uživatelů pracovního prostoru používat tokeny patního přístupu, které patří instančním objektům . Pokud chcete vytvořit tokeny pro instanční objekty, přečtěte si téma Správa tokenů instančního objektu.
Adresa URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Nahradit
<get-workspace-instance>
.Nahraďte
<pipeline-id>
identifikátorem kanálu.Metoda: V rozevírací nabídce vyberte POST.
Záhlaví: Klikněte na + Nový. Do textového pole Název zadejte
Authorization
. Do textového pole Hodnota zadejteBearer <personal-access-token>
.Nahraďte
<personal-access-token>
osobním přístupovým tokenem Azure Databricks.Tělo: Pokud chcete předat další parametry požadavku, zadejte dokument JSON obsahující parametry. Chcete-li například spustit aktualizaci a znovu zpracovat všechna data pro pipeline:
{"full_refresh": "true"}
. Pokud neexistují žádné další parametry požadavku, zadejte prázdné složené závorky ({}
).
Chcete-li otestovat webovou aktivitu, klepněte na tlačítko Ladit na panelu nástrojů v uživatelském rozhraní služby Data Factory. Výstup a stav spuštění, včetně chyb, se zobrazí na kartě Výstup kanálu služby Azure Data Factory. K zobrazení podrobností o aktualizaci kanálu použijte uživatelské rozhraní Delta Live Tables.
Tip
Běžným požadavkem pracovního postupu je zahájení úkolu po dokončení předchozího úkolu. Vzhledem k tomu, že požadavek na Delta Live Tables updates
je asynchronní – požadavek se vrátí po spuštění aktualizace, ale před dokončením aktualizace – musí úlohy v datovém kanálu Azure Data Factory, které jsou závislé na aktualizaci Delta Live Tables, počkat na dokončení aktualizace. Možností, jak čekat na dokončení aktualizace, je přidat aktivitu Until po webové aktivitě, která aktivuje aktualizaci Delta Live Tables. V aktivitě Until:
- Přidejte aktivitu Wait, aby po dokončení aktualizace čekala nakonfigurovaný počet sekund.
- Přidejte webovou aktivitu po aktivitě Čekání, která používá požadavek na aktualizaci Delta Live Tables k získání stavu aktualizace. Pole
state
v odpovědi vrátí aktuální stav aktualizace, včetně toho, jestli byla dokončena. - Pomocí hodnoty pole
state
nastavte ukončovací podmínku aktivity Until. Můžete také použít Nastavit aktivitu proměnné přidat proměnnou kanálu na základě hodnotystate
a použít tuto proměnnou pro ukončovací podmínku.