Delta Live Tables パイプラインを監視する
この記事では、Delta Live Tables パイプラインに組み込みの監視機能と監視機能を使用する方法について説明します。 これらの機能では、次のようなタスクがサポートされます。
- パイプラインの更新の進行状況と状態の監視。 「UI で使用できるパイプラインの詳細は何ですか?」を参照してください。
- パイプラインの更新の成功または失敗などのパイプライン イベントに関するアラート。 「パイプライン イベントの通知を追加する」を参照してください。
- データ系列、データ品質メトリック、リソースの使用状況など、パイプラインの更新に関する詳細情報の抽出。 「Delta Live Tables イベント ログのクエリ実行」を参照してください。
- 特定のイベントが発生したときに実行するカスタム アクションの定義。 「イベント フックを使用して Delta Live Tables パイプラインのカスタム監視を定義する」をご覧ください。
クエリのパフォーマンスを検査して診断するには、デルタ ライブ テーブル パイプラインの Access クエリ履歴を参照してください。 この機能はパブリック プレビュー段階にあります。
パイプライン イベントの通知を追加する
次の場合に通知を受信するように 1 つ以上のメール アドレスを構成できます:
- パイプラインの更新が正常に完了しました。
- パイプラインの更新は、再試行可能または再試行不可能なエラーで失敗します。 すべてのパイプラインの失敗について通知を受け取るには、このオプションを選択します。
- パイプラインの更新が失敗し、再試行できない (致命的な) エラーが発生します。 再試行不可能なエラーが発生した場合にのみ通知を受け取るには、このオプションを選択します。
- 1 つのデータ フローが失敗します。
パイプラインを 作成 または編集するときに電子メール通知を構成するには:
- [通知の追加] をクリックします。
- 通知を受信する 1 つ以上のメール アドレスを入力します。
- 各通知の種類のチェック ボックスをオンにして、構成済みのメール アドレスに送信します。
- [通知の追加] をクリックします。
UI で使用できるパイプラインの詳細は何ですか?
パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 既定では、パイプラインの詳細ページにテーブルの最新の更新が表示されますが、ドロップダウン メニューからは古い更新を選択できます。
詳細には、パイプライン ID、ソース コード、コンピューティング コスト、製品エディション、パイプライン用に構成されたチャネルが含まれます。
データセットの表形式ビューを表示するには、[リスト] タブをクリックします。リスト ビューを使用すると、パイプライン内のすべてのデータセットをテーブルの行として表示できます。これは、パイプライン DAG が大きすぎて、グラフ ビューに視覚化できない場合に便利です。 データセット名、型、状態などの複数のフィルターを使用して、テーブルに表示されるデータセットを制御できます。 もう一度 DAG の視覚化に切り替えるには、[グラフ] をクリックします。
[実行するアカウント名] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。 run as
ユーザーを変更するには、[アクセス許可] をクリックしてパイプラインの所有者を変更します。
データセットの詳細を表示するにはどうすればよいですか?
パイプラインのグラフまたはデータセットの一覧でデータセットをクリックすると、そのデータセットに関する詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリック、データセットを定義するソース コードへのリンクが含まれます。
更新履歴を表示する
パイプラインの更新の履歴と状態を表示するには、上部バーにある [更新履歴] ドロップダウン メニューをクリックします。
ドロップダウン メニューで更新プログラムを選択して、更新プログラムのグラフ、詳細、イベントを表示します。 最新の更新に戻るには、[Show the latest update] (最新の更新を表示) をクリックします。
Delta Live Tables イベント ログのクエリ実行
デルタ ライブ テーブル イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データ系統など、パイプラインに関連するすべての情報が含まれます。 イベント ログを使用して、データ パイプラインの状態を追跡、把握、および監視することができます。
デルタ ライブ テーブル ユーザー インターフェイス、デルタ ライブ テーブル API で、またはイベント ログに直接クエリを実行することによって、イベント ログ エントリを表示できます。 このセクションでは、イベント ログに直接クエリを実行することに重点を置いています。
また、イベント フックを使用してイベント (例: アラートの送信) がログに記録されると、実行するカスタム アクションを定義することもできます。
イベント ログのスキーマ
次の表で、イベント ログのスキーマについて説明します。 フィールドの一部に、details
フィールドなど、いくつかのクエリを実行するために解析を必要とする JSON データが含まれています。 Azure Databricks では、JSON フィールドを解析するための :
演算子がサポートされています。 : (コロン記号) 演算子を参照してください。
フィールド | 説明 |
---|---|
id |
イベント ログ レコードの一意識別子。 |
sequence |
イベントを識別および順序付けるためのメタデータを含む JSON ドキュメント。 |
origin |
パイプラインがどこに作成されたか (DBSQL または WORKSPACE ) を示す、クラウド プロバイダー、クラウド プロバイダー リージョン、user_id 、pipeline_id 、pipeline_type など、イベントの発生元のメタデータを含む JSON ドキュメント。 |
timestamp |
イベントが記録された時刻。 |
message |
人が判読できる、イベントを説明するメッセージ。 |
level |
イベントの種類 (INFO 、WARN 、ERROR 、METRICS など)。 |
error |
エラーが発生した場合の、エラーの詳細説明。 |
details |
イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主なフィールドです。 |
event_type |
イベントの種類。 |
maturity_level |
イベント スキーマの安定性。 指定できる値は次のとおりです。 - STABLE : このスキーマは安定しており、変更されません。- NULL : このスキーマは安定しており、変更されません。 maturity_level フィールドが追加される前にレコードが作成された場合、この値は NULL になる可能性があります (リリース 2022.37)。- EVOLVING : このスキーマは安定していないため、変更される可能性があります。- DEPRECATED : このスキーマは非推奨であり、Delta Live Tables ランタイムはいつでもこのイベントの生成を停止することができます。 |
イベント ログへのクエリ実行
イベント ログの場所とイベント ログのクエリを実行するインターフェイスは、パイプラインで Hive メタストアと Unity カタログのどちらを使用するように構成されているかによって異なります。
Hive metastore
パイプラインが Hive メタストアにテーブルを発行する場合、イベント ログは storage
の場所の下の /system/events
に格納されます。 たとえば、パイプライン storage
の設定を /Users/username/data
と構成した場合、イベン トログは DBFS の /Users/username/data/system/events
パスに格納されます。
storage
設定を構成していない場合、既定のイベント ログの場所は DBFS の /pipelines/<pipeline-id>/system/events
になります。 たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003
の場合、格納場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
となります。
ビューを作成して、イベント ログのクエリを簡素化できます。 次の例では、event_log_raw
という一時ビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。
CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;
<event-log-path>
をイベント ログの場所に置き換えます。
パイプライン実行の各インスタンスは、"更新" と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリを実行して最新の更新プログラムの識別子を検索し、latest_update_id
一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Azure Databricks ノートブックまたは SQL エディターでイベント ログにクエリを実行できます。 ノートブックまたは SQL エディターを使用して、イベント ログ クエリの例を実行します。
Unity Catalog
パイプラインでテーブルを Unity Catalog に公開する場合は、event_log
テーブル値関数 (TVF) を使用してパイプラインのイベント ログをフェッチする必要があります。 パイプラインのイベント ログを取得するには、パイプライン ID またはテーブル名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b
を持つパイプライン向けのイベント ログ レコードを取得するには、次のようにします。
SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")
テーブル my_catalog.my_schema.table1
を作成または所有するパイプラインのイベント ログ レコードを取得するには、次のようにします。
SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))
TVF を呼び出すには、共有クラスターまたは SQL ウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQL ウェアハウスに接続されている SQL エディター を使用したりすることができます。
パイプラインに対するイベントのクエリを簡略化するために、パイプラインの所有者は event_log
TVF 経由でビューを作成できます。 次の例では、パイプラインのイベント ログ経由でビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。
Note
event_log
TVF はパイプライン所有者のみが呼び出すことができ、event_log
TVF 経由で作成されたビューは、パイプライン所有者のみがクエリを実行できます。 ビューを他のユーザーと共有することはできません。
CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");
<pipeline-ID>
を Delta Live Tables パイプラインの一意識別子に置き換えます。 ID は、Delta Live Tables UI の [パイプラインの詳細] パネルで確認できます。
パイプライン実行の各インスタンスは、"更新" と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリを実行して最新の更新プログラムの識別子を検索し、latest_update_id
一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
イベント ログに系列情報のクエリを実行する
データ系列に関する情報を含むイベントには、イベントの種類 flow_definition
があります。 details:flow_definition
オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset
と input_datasets
が含まれます。
次のクエリを使用して、入力データセットと出力データセットを抽出して系列情報を表示できます。
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
イベント ログにデータ品質のクエリを実行する
パイプラインのデータセットで期待値を定義すると、データ品質メトリックが details:flow_progress.data_quality.expectations
オブジェクトに格納されます。 データ品質に関する情報を含むイベントには、イベントの種類 flow_progress
があります。 次の例では、最後に行ったパイプラインの更新のデータ品質メトリックに対してクエリを実行します。
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
ログにクエリを実行してデータ バックログを監視する
Delta Live Tables は、details:flow_progress.metrics.backlog_bytes
オブジェクトのバックログに存在するデータの量を追跡します。 バックログ メトリックを含むイベントには、イベントの種類 flow_progress
があります。 次の例では、最新のパイプラインの更新のバックログ メトリックに対してクエリを実行します。
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
注意
パイプラインのデータ ソースの種類と Databricks Runtime のバージョンによっては、バックログ メトリックを使用できない場合があります。
サーバーレスが有効になっていないパイプラインのイベント ログからの拡張自動スケール イベントを監視する
サーバーレス コンピューティングを使用しない DLT パイプラインの場合、パイプラインで拡張自動スケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 拡張自動スケールに関する情報を含むイベントには、イベントの種類が autoscale
。 クラスターのサイズ変更要求の情報は、details:autoscale
オブジェクトに格納されます。 次の例では、最後のパイプライン更新に対する拡張自動スケール クラスターのサイズ変更要求を照会します。
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
リソース使用率を監視する
cluster_resources
イベントでは、クラスター内のタスク スロットの数、それらのタスク スロットの使用率、スケジュールを待機しているタスクの数に関するメトリックが提供されます。
拡張自動スケールが有効になっている場合、 cluster_resources
イベントには、 latest_requested_num_executors
や optimal_num_executors
など、自動スケール アルゴリズムのメトリックも含まれます。 イベントには、アルゴリズムの状態も、CLUSTER_AT_DESIRED_SIZE
、SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
、BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
などのさまざまな状態として表示されます。
この情報は、自動スケール イベントと組み合わせて表示して、拡張自動スケールの全体像を提供できます。
次の例では、最後のパイプライン更新のタスク キュー サイズ履歴に対してクエリを実行します。
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、最後のパイプライン更新の使用率の履歴に対してクエリを実行します。
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、強化された自動スケール パイプラインでのみ使用できるメトリックと共に、最新の要求でアルゴリズムによって要求された Executor の数、最新のメトリックに基づいてアルゴリズムによって推奨される Executor の最適な数、自動スケール アルゴリズムの状態など、Executor カウント履歴を照会します。
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Delta Live Tables パイプラインの監査を行う
Delta Live Tables イベント ログ レコードやその他の Azure Databricks 監査ログを使用して、Delta Live Tables でどのようにデータが更新されているのかという全体像を把握できます。
デルタ ライブ テーブルは、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用される認証情報を変更できます。 Delta Live Tables では、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインに対してアクションをするユーザーが記録されます。
Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。
イベント ログにユーザー アクションのクエリを実行する
イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザー アクションに関する情報を含むイベントには、イベントの種類 user_action
があります。
アクションに関する情報は、details
フィールドの user_action
オブジェクトに格納されます。 次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用される event_log_raw
ビューを作成するには、「イベント ログのクエリ実行」を参照してください。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
ランタイム情報
パイプライン更新のランタイム情報 (更新の Databricks Runtime バージョンなど) を表示できます。
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |