在工作流程中執行 Delta Live Tables 管線
您可以使用 Databricks 作業、Apache Airflow 或 Azure Data Factory,在數據處理工作流程中執行 Delta Live Tables 管線。
工作
您可以在 Databricks 工作中協調多個工作,以實作資料處理工作流程。 若要在作業中包含 Delta Live Tables 管線,請在創建作業時使用 管線 這項工作。 請參閱作業的
Apache Airflow
Apache Airflow 是管理及排程資料工作流程的開放原始碼解決方案。 Airflow 以有向非循環圖 (DAG) 的形式表示作業的工作流程。 您可以在 Python 檔案中定義工作流程,而 Airflow 會管理排程和執行。 如需搭配 Azure Databricks 安裝和使用 Airflow 的資訊,請參閱使用 Apache Airflow 協調 Azure Databricks 工作。
若要在 Airflow 工作流程中執行 Delta Live Tables 管線,請使用 DatabricksSubmitRunOperator。
需求
以下是使用 Delta Live Tables 的 Airflow 支援所需的必備要件:
- Airflow 2.1.0 版或更新版本。
- Databricks 提供者套件 2.1.0 版或更新版本。
範例
下列範例會建立 Airflow DAG,其會觸發具有標識碼 8279d543-063c-4d63-9926-dae38e35ce8b
之 Delta Live Tables 管線的更新:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
將
將此範例儲存在目錄中,airflow/dags
並使用 Airflow UI 來 檢視和觸發 DAG。 使用 Delta Live Tables UI 來檢視管線更新的詳細數據。
Azure Data Factory
注意
Delta Live Tables 和 Azure Data Factory 各包含選項,以設定發生失敗時重試次數。 如果您在 Delta Live Tables 管線上 設定了重試值,並且在呼叫該管線的 Azure Data Factory 活動上 也設定了重試值,那麼重試次數將是 Azure Data Factory 的重試值乘以 Delta Live Tables 的重試值。
例如,如果管線更新失敗,Delta Live Tables 預設會重試更新最多五次。 如果 Azure Data Factory 重試設定為 3,而您的 Delta Live Tables 管線會使用預設的五次重試,則失敗的 Delta Live Tables 管線可能會重試最多 15 次。 為了避免在管線更新失敗時發生過多重試嘗試,Databricks 建議在設定 Delta Live Tables 管線或呼叫管線的 Azure Data Factory 活動時限制重試次數。
若要變更 Delta Live Tables 管線的重試組態,請在設定管線時使用 pipelines.numUpdateRetryAttempts
設定。
Azure Data Factory 是雲端式 ETL 服務,可讓您協調資料整合與轉換工作流程。 Azure Data Factory 直接支援在工作流程中執行 Azure Databricks 工作,包括 筆記本、JAR 工作和 Python 指令碼。 您也可以在工作流程中包含一個管線,方法是從 Azure Data Factory 的 Web 活動中調用 Delta Live Tables 的 API。 例如,若要從 Azure Data Factory 觸發管線更新:
建立資料處理站或開啟現有的資料處理站。
建立完成時,開啟資料處理站的頁面,然後按下 [開啟 Azure Data Factory Studio] 圖格。 Azure Data Factory 使用者介面隨即出現。
從 Azure Data Factory Studio 使用者介面的 [新增] 下拉功能表中選取 [管線],以建立新的 Azure Data Factory 管線。
在 [活動] 工具箱中,展開 [一般],然後將 [網頁] 活動拖曳到管線創作區。 點選單擊 [設定] 索引標籤,然後輸入下列值:
注意
作為安全性最佳做法,當您使用自動化工具、系統、指令碼和應用程式進行驗證時,Databricks 建議您使用屬於服務主體的個人存取權杖,而不是工作區使用者。 若要建立服務主體的權杖,請參閱管理服務主體的權杖。
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
。取代
<get-workspace-instance>
。將
<pipeline-id>
取代為管線標識碼。方法:從下拉選單中選擇 POST。
標頭:按下 [+ 新增]。 在 [使用者名稱] 文字方塊中,輸入
Authorization
。 在[值] 文字方塊中,輸入Bearer <personal-access-token>
。Body:若要傳遞其他請求參數,請輸入包含這些參數的 JSON 文件。 例如,若要啟動更新並重新處理管道的所有資料:
{"full_refresh": "true"}
。 如果沒有額外的要求參數,請輸入空白大括弧 ({}
)。
若要測試網路活動,請按下 Data Factory UI 中管線工具列上的 [偵錯]。 執行的輸出和狀態,包括錯誤,會顯示在 Azure Data Factory 管線的 [輸出] 索引標籤中。 使用 Delta Live Tables UI 來檢視管線更新的詳細數據。
提示
常見的工作流程需求是在上一個工作完成之後啟動工作。 由於 Delta Live Tables updates
請求是非同步的—請求在啟動更新後會返回,但在更新完成之前—Azure Data Factory 管線中依賴 Delta Live Tables 更新的工作必須待更新完成。 為了等待更新完成,可以選擇在觸發 Delta Live Tables 更新的 Web 活動之後,新增 Until 活動。 在 Until 活動中: