監視 DLT 管線
本文說明如何針對 DLT 管線使用內建的監視和可觀察性功能。 這些功能支援下列工作:
- 觀察管線更新的進度和狀態。 請參閱 UI 中有哪些管線詳細數據?。
- 針對管線事件發出警示,例如管線更新成功或失敗。 請參閱 ,新增有關管線事件的電子郵件通知。
- 檢視串流來源的指標,例如 Apache Kafka 和自動載入器(公開預覽)。 請參閱 查看串流指標。
- 擷取管線更新的詳細資訊,例如數據歷程、數據品質計量和資源使用量。 請參閱 什麼是 DLT 事件記錄檔?。
- 定義特定事件發生時要採取的自定義動作。 請參閱 定義 DLT 管線的自訂監控並使用事件鉤子。
若要檢查和診斷查詢效能,請參閱 DLT 管線的存取查詢歷程記錄。 這項功能處於公開預覽狀態。
新增管道事件的電子郵件通知
您可以設定一或多個電子郵件位址,以在發生下列情況時接收通知:
- 管線更新成功完成。
- 管線更新失敗,發生可重試或無法重試的錯誤。 選取此選項可接收所有管線失敗的通知。
- 管線更新失敗,並出現無法重試(嚴重)錯誤。 選取此選項,只有在發生無法重試的錯誤時,才會收到通知。
- 單一數據流失敗。
當您 建立 或編輯管線時,若要設定電子郵件通知:
- 按下 新增通知。
- 輸入一或多個電子郵件位址以接收通知。
- 按兩下每個通知類型的複選框,以傳送至已設定的電子郵件位址。
- 按下 新增通知。
UI 中有哪些管線詳細數據?
管線圖形會在管線更新成功后立即出現。 箭頭代表管線中數據集之間的相依性。 根據預設,管線詳細數據頁面會顯示數據表的最新更新,但您可以從下拉功能表中選取較舊的更新。
詳細數據包括管線標識碼、計算成本、產品版本,以及針對管線設定的通道。
若要查看數據集的表格式檢視,請按兩下 [清單] 索引標籤。清單 檢視可讓您查看管線中以數據列表示的所有數據集,而且當管線 DAG 太大而無法以 Graph 檢視來可視化時很有用。 您可以使用多個篩選來控制數據表中顯示的數據集,例如數據集名稱、類型和狀態。 若要切換回 DAG 視覺效果,請按兩下 Graph。
以 使用者身份執行的使用者是管線的擁有者,而管線更新將以該使用者的權限執行。 若要變更 run as
使用者,請點擊 權限 並變更管線擁有者。
如何檢視數據集詳細數據?
點擊管線圖或資料集清單中的資料集會顯示資料集的詳細資料。 詳細數據報括數據集架構、數據品質計量,以及定義數據集的原始碼連結。
檢視更新歷程記錄
若要檢視管線更新的歷程記錄和狀態,請單擊頂端列中的 [更新歷程記錄] 下拉功能表。
在下拉功能表中選取更新,以檢視更新的圖形、詳細數據和事件。 若要回到最新的更新,請按一下 [顯示最新的更新。
檢視串流指標
重要
DLT 的串流監控能見度現已進入 公開預覽。
您可以檢視 Spark 結構化串流所支持的數據源串流計量,例如 Apache Kafka、Amazon Kinesis、Auto Loader 和 Delta 數據表,用於 DLT 管線中的每個串流流程。 指標會顯示為 DLT UI 右窗格中的圖表,並包含積壓秒數、積壓位元組、積壓記錄和積壓檔案。 圖表會顯示依分鐘匯總的最大值,工具提示會在您將滑鼠停留在圖表上時顯示最大值。 數據限制在目前時間的過去 48 小時內。
當您在 UI 圖形 檢視中查看管線 DAG 時,管線中具有串流計量的數據表會顯示 圖示。 若要檢視串流計量,請按兩下
,在右窗格中的 [流程] 索引標籤中顯示串流計量圖表。 您也可以套用篩選條件,只檢視具有串流計量的數據表,方法是按兩下 [List],然後按兩下 [具有串流計量。
每個串流來源只支援特定的計量。 串流來源不支援的計量無法在UI中檢視。 下表顯示支援的串流來源可用的計量:
源 | 積壓位元組 | 待辦項目記錄 | 待辦積壓時間(秒) | 待辦項目檔案 |
---|---|---|---|---|
卡 夫 卡 | ✓ | ✓ | ||
Kinesis | ✓ | ✓ | ||
三角洲 | ✓ | ✓ | ||
自動載入器 | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
什麼是 DLT 事件記錄檔?
DLT 事件記錄檔包含與管線相關的所有資訊,包括稽核記錄、數據質量檢查、管線進度和數據歷程。 您可以使用事件記錄檔來追蹤、瞭解及監視資料管線的狀態。
您可以在 DLT 使用者介面、DLT API中檢視事件日誌條目,或直接查詢事件日誌。 本節著重於直接查詢事件記錄檔。
您也可以定義自訂動作,例如傳送警示,在記錄事件時執行,使用 事件鉤子。
重要
請勿刪除事件記錄檔或發佈事件記錄檔的父目錄或架構。 刪除事件記錄檔可能會導致管線在未來執行期間無法更新。
事件記錄架構
下表描述事件記錄檔架構。 其中有些欄位包含 JSON 數據,需要剖析才能執行某些查詢,例如 details
字段。 Azure Databricks 支援 :
運算符來剖析 JSON 字段。 請參閱 :
(冒號) 運算子。
場地 | 描述 |
---|---|
id |
事件記錄檔記錄的唯一標識符。 |
sequence |
JSON 檔,其中包含用來識別和排序事件的元數據。 |
origin |
JSON 檔,其中包含事件來源的元數據,例如雲端提供者、雲端提供者區域、user_id 、pipeline_id 或 pipeline_type ,以顯示管線的建立位置,DBSQL 或 WORKSPACE 。 |
timestamp |
記錄事件的時間。 |
message |
描述事件的人類可讀訊息。 |
level |
事件類型,例如,INFO 、WARN 、ERROR 或 METRICS 。 |
maturity_level |
事件架構的穩定性。 可能的值為:
|
error |
如果發生錯誤,則提供錯誤的詳情。 |
details |
JSON 檔,其中包含事件的結構化詳細數據。 這是用來分析事件的主要欄位。 |
event_type |
事件類型。 |
查詢事件記錄檔
注意
本節說明使用以 Unity Catalog 和預設發佈模式設定之管線的事件記錄的預設方式及語法。
- 如需使用舊版發佈模式的 Unity Catalog 管線之行為,請參閱 Unity Catalog 舊版發佈模式管線的事件記錄。
- 如需了解 Hive 中繼存放區管线的行為和語法,請參閱 使用 Hive 中繼存放區管线的事件記錄。
根據預設,DLT 會將事件記錄檔寫入至針對管線設定的預設目錄和架構中的隱藏 Delta 資料表。 隱藏時,所有具許可權的使用者仍然可以查詢數據表。 根據預設,只有管線的擁有者可以查詢事件記錄數據表。
根據預設,隱藏事件記錄檔的名稱會格式化為 event_log_{pipeline_id}
,其中管線標識碼是系統指派的 UUID,以虛線取代底線。
您可以與 JSON 組態互動,以發佈事件記錄檔。 當您發佈事件記錄檔時,您可以指定事件記錄檔的名稱,並選擇性地指定目錄和架構,如下列範例所示:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
事件記錄檔位置同時也是管線中任何 Auto Loader 查詢的架構位置。 Databricks 建議在修改許可權之前先建立事件記錄數據表的檢視,因為某些計算設定可能會允許使用者直接共用事件記錄檔數據表時存取架構元數據。 下列範例語法會在事件記錄數據表上建立檢視,並用於本文中包含的範例事件記錄查詢中。
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
每次執行管道的實例稱為 更新。 您通常會想要擷取最新更新的資訊。 執行下列查詢來尋找最新更新的標識符,並將它儲存在 latest_update
暫存檢視中。 本文中包含的範例事件記錄查詢中會使用此檢視:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
在 Unity Catalog 中,視圖支援串流查詢。 下列範例會使用結構化串流來查詢事件記錄資料表上方定義的檢視:
df = spark.readStream.table("event_log_raw")
管線的擁有者可以將事件記錄檔發佈為公用 Delta 數據表,方法是切換管線組態 Publish event log to metastore
] 區段中的 [] 選項。 您可以選擇性地指定事件記錄檔的新資料表名稱、目錄和架構。
從 事件記錄查詢傳承資訊
包含譜系資訊的事件具有事件類型 flow_definition
。
details:flow_definition
物件包含定義圖形中每個關聯性的 output_dataset
和 input_datasets
。
您可以使用下列查詢來擷取輸入和輸出資料集,以查看譜系資訊:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
從事件記錄檔查詢數據品質
如果您在管線中定義數據集的預期,數據品質計量會儲存在 details:flow_progress.data_quality.expectations
物件中。 包含資料品質相關資訊的事件具有事件類型 flow_progress
。 下列範例會查詢上次管線更新的數據品質計量:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
從事件記錄檔查詢自動載入器事件
DLT 會在自動載入器處理檔案時產生事件。 針對自動載入器事件,event_type
為 operation_progress
,且 details:operation_progress:type
為 AUTO_LOADER_LISTING
或 AUTO_LOADER_BACKFILL
。
details:operation_progress
物件也包含 status
、duration_ms
、auto_loader_details:source_path
和 auto_loader_details:num_files_listed
欄位。
下列範例會查詢自動載入器事件以取得最新的更新:
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
查詢事件記錄檔來監控數據積壓
DLT 會追蹤 details:flow_progress.metrics.backlog_bytes
物件中待辦專案中的數據量。 包含待辦項目計量的事件具有事件類型 flow_progress
。 以下範例顯示如何查詢上一次管道更新的累積工作量指標:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
備註
根據 Databricks Runtime 版本和管線的數據來源類型,積壓指標可能無法取得。
監視未啟用無伺服器之管線事件記錄檔中的增強式自動調整事件
對於不使用無伺服器計算的 DLT 管線,當在管線中啟用增強型自動調整時,事件記錄檔會記錄叢集的縮放。 包含增強式自動調整資訊的事件,具有事件類型 autoscale
。 叢集重設大小要求資訊會儲存在 details:autoscale
物件中。 下列範例會查詢最近一次管線更新的自動伸縮叢集調整大小請求:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
監視計算資源使用率
cluster_resources
事件會針對叢集中的工作位置數目、使用這些工作位置的數量,以及等待排程的工作數目提供計量。
開啟增強型自動調整時,cluster_resources
事件也包含自動調整演算法的計量,包括 latest_requested_num_executors
和 optimal_num_executors
。 這些事件也會將演算法的狀態顯示為不同的狀態,例如 CLUSTER_AT_DESIRED_SIZE
、SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
和 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
。
這項資訊可以與自動調整事件一起檢視,以提供增強式自動調整的整體畫面。
下列範例查詢最近管線更新的工作佇列大小歷史:
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
下列範例會查詢上次管線更新的使用率歷程記錄:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
下列範例會查詢執行程式計數歷程記錄,並隨附僅適用於增強型自動調整管線的計量,包括演算法在最新要求中要求的執行程式數目、演算法根據最新計量所建議的最佳執行程式數目,以及自動調整演算法狀態:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
審核分散式帳本技術 (DLT) 工作流程
您可以使用 DLT 事件記錄檔記錄和其他 Azure Databricks 稽核記錄,完整瞭解 DLT 中的數據更新方式。
DLT 會使用管線擁有者的認證來執行更新。 您可以藉由更新管線擁有者來變更使用的身份驗證資料。 DLT 會記錄使用者在管線上執行的動作,包括管線建立、設定的編輯,以及觸發更新。
如需 Unity 目錄稽核事件的參考,請參閱 Unity 目錄事件。
查詢事件記錄檔中的用戶動作
您可以使用事件記錄檔來稽核事件,例如用戶動作。 包含使用者動作相關信息的事件具有事件類型 user_action
。
動作的相關信息會儲存在 [user_action
] 字段中 details
物件中。 使用下列查詢來建構使用者事件的稽核記錄。 若要建立此查詢中使用的 event_log_raw
檢視,請參閱 查詢事件記錄檔。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
運行時間資訊
您可以檢視管線更新的執行環境資訊,例如更新的 Databricks Runtime 版本:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |