次の方法で共有


Delta Live Tables パイプラインを監視する

この記事では、Delta Live Tables パイプラインにおいて、組み込みの監視機能と可観測性機能を使用する方法について説明します。 これらの機能では、次のようなタスクがサポートされます。

クエリのパフォーマンスを検査して診断するには、Delta Live パイプラインの Access クエリ履歴を参照してください。 この機能はパブリック プレビュー段階にあります。

パイプライン イベントの通知を追加する

次の場合に通知を受信するように 1 つ以上のメール アドレスを構成できます:

  • パイプライン update は正常に完了します。
  • パイプライン update は、再試行可能エラーまたは再試行不可エラーで失敗します。 このオプションを Select して、すべてのパイプラインエラーの通知を受信します。
  • パイプライン update は、再試行できない (致命的な) エラーで失敗します。 再試行できないエラーが発生した場合にのみ通知を受信するには、このオプションを Select します。
  • 1 つのデータ フローが失敗します。

パイプラインを 作成 または編集するときに電子メール通知を構成するには:

  1. [通知の追加] をクリックします。
  2. 通知を受信する 1 つ以上のメール アドレスを入力します。
  3. 各通知の種類のチェック ボックスをオンにして、構成済みのメール アドレスに送信します。
  4. [通知の追加] をクリックします。

UI で使用できるパイプラインの詳細は何ですか?

パイプライン グラフは、パイプラインのupdateが正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 既定では、パイプラインの詳細ページにtableの最新のupdateが表示されますが、ドロップダウン メニューからは古い更新をselectできます。

詳細には、パイプライン ID、ソース コード、コンピューティング コスト、製品エディション、パイプライン用に構成されたチャネルが含まれます。

データセットの表形式ビューを表示するには、[List] タブをクリックします。List ビューを使用すると、パイプライン内のすべてのデータセットを table の行として表すことができます。パイプライン DAG が大きすぎて、Graph ビューで視覚化するのに役立ちます。 データセット名、型、状態などの複数のフィルターを使用して、table に表示されるデータセットを制御できます。 もう一度 DAG の視覚化に切り替えるには、[グラフ] をクリックします。

[実行するアカウント名] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。 run as ユーザーを変更するには、[アクセス許可] をクリックしてパイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプライン グラフまたはデータセット list 内のデータセットをクリックすると、データセットに関する詳細が表示されます。 詳細には、データセット schema、データ品質メトリック、データセットを定義するソース コードへのリンクが含まれます。

update 履歴を表示する

パイプラインの更新の履歴と状態を表示するには、上部のバーにある [update 履歴] ドロップダウン メニューをクリックします。

ドロップダウン メニューのupdateをSelectすると、updateのグラフ、詳細、イベントが表示されます。 最新の に戻すには、[最新の 表示] をクリックします。

ストリーミング メトリックを表示する

重要

Delta Live Tables のストリーミング可観測性は、公開プレビュー段階にあります。

Delta Live Tables パイプライン内の各ストリーミング フローについて、Apache Kafka、Amazon Kinesis、Auto Loader、Delta tablesなど、Spark Structured Streaming でサポートされているデータ ソースからのストリーミング メトリックを表示できます。 メトリックは Delta Live Tables UI の右ペインにグラフとして表示され、バックログの秒、バックログ バイト、バックログ レコード、バックログ ファイルが含まれます。 グラフには、分単位で集計された最大値が表示され、グラフにカーソルを合わせると、ヒントに最大 values が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。

ストリーミング メトリックを使用できるパイプライン内の Tables は、UI Graph ビューでパイプライン DAG を表示するときに、Delta Live Tables グラフ アイコン アイコンを表示します。 ストリーミング メトリックを表示するには、Delta Live Tables グラフ アイコン をクリックして、右側のウィンドウの [フロー] タブにストリーミング メトリック グラフを表示します。 また、[List]、[ストリーミング メトリックがある] の順にクリックすることで、ストリーミング メトリックのあるtablesのみを表示することもできます。

各ストリーミング ソースでは、特定のメトリックのみがサポートされます。 ストリーミング ソースでサポートされていないメトリックは、UI で表示できません。 次の table は、サポートされているストリーミング ソースで使用できるメトリックを示しています。

source バックログ バイト数 バックログ レコード バックログ秒数 バックログ ファイル
カフカ
キネシス
デルタ
自動ローダー
Google Pub/Sub

Delta Live Tables イベント ログとは

Delta Live Tables イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データ系列など、パイプラインに関連するすべての情報が含まれます。 イベント ログを使用して、データ パイプラインの状態を追跡、把握、および監視することができます。

Delta Live Tables ユーザー インターフェイス、Delta Live TablesAPI、またはイベント ログに直接クエリを実行して、イベント ログ エントリを表示できます。 このセクションでは、イベント ログに直接クエリを実行することに重点を置いています。

また、イベント フックを使用してイベント (例: アラートの送信) がログに記録されると、実行するカスタム アクションを定義することもできます。

イベント ログ schema

次の table では、イベント ログの schemaについて説明します。 フィールドの一部に、details フィールドなど、いくつかのクエリを実行するために解析を必要とする JSON データが含まれています。 Azure Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : (コロン記号) 演算子を参照してください。

フィールド 説明
id イベント ログ レコードの一意のidentifier。
sequence イベントを識別および順序付けるためのメタデータを含む JSON ドキュメント。
origin イベントの発生元のメタデータ (クラウド プロバイダー、クラウド プロバイダーリージョン、user_idpipeline_idpipeline_type など) を含む JSON ドキュメント。パイプラインが作成された where (DBSQL または WORKSPACE) を示します。
timestamp イベントが記録された時刻。
message 人が判読できる、イベントを説明するメッセージ。
level イベントの種類 (INFOWARNERRORMETRICS など)。
error エラーが発生した場合の、エラーの詳細説明。
details イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主なフィールドです。
event_type イベントの種類。
maturity_level イベント schemaの安定性。 考えられる values は次のとおりです。

- STABLE: schema は安定しており、変更されません。
- NULL: schema は安定しており、変更されません。 NULL フィールドが追加される前にレコードが作成された場合、この値は maturity_level になる可能性があります (リリース 2022.37)。
- EVOLVING: schema が安定していないため、変更される可能性があります。
- DEPRECATED: schema は非推奨となり、Delta Live Tables ランタイムは、いつでもこのイベントの生成を停止する可能性があります。

イベント ログへのクエリ実行

イベント ログの場所とイベント ログのクエリを実行するインターフェイスは、パイプラインが Hive メタストアと Unity Catalogのどちらを使用するように構成されているかによって異なります。

Hive metastore

パイプライン が Hive メタストアに tables を発行した場合、イベントログは storage の場所で /system/events に格納されます。 たとえば、パイプライン storage の設定を /Users/username/data と構成した場合、イベン トログは DBFS の /Users/username/data/system/events パスに格納されます。

storage 設定を構成していない場合、既定のイベント ログの場所は DBFS の /pipelines/<pipeline-id>/system/events になります。 たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003 の場合、格納場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events となります。

ビューを作成して、イベント ログのクエリを簡素化できます。 次の例では、event_log_raw という一時ビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

<event-log-path> をイベント ログの場所に置き換えます。

パイプライン実行の各インスタンスは、updateと呼ばれます。 多くの場合、最新の updateの情報を抽出する必要があります。 次のクエリを実行して、最新の update の identifier を検索し、latest_update_id 一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Azure Databricks ノートブックまたは SQL エディターでイベント ログにクエリを実行できます。 ノートブックまたは SQL エディターを使用して、イベント ログ クエリの例を実行します。

Unity Catalog

パイプライン が Unity Catalogに tables を発行する場合は、event_logtable の値関数 (TVF) を使用してパイプラインのイベントログを取得する必要があります。 パイプラインのイベント ログを取得するには、パイプライン ID または table 名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b を持つパイプライン向けのイベント ログ レコードを取得するには、次のようにします。

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

tablemy_catalog.my_schema.table1を作成または所有しているパイプラインのイベント ログ レコードを取得するには:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

TVF を呼び出すには、共有クラスターまたは SQL ウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQL ウェアハウスに接続されている SQL エディター を使用したりすることができます。

パイプラインに対するイベントのクエリを簡略化するために、パイプラインの所有者は event_log TVF 経由でビューを作成できます。 次の例では、パイプラインのイベント ログ経由でビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

Note

event_log TVF はパイプライン所有者のみが呼び出すことができ、event_log TVF 経由で作成されたビューは、パイプライン所有者のみがクエリを実行できます。 ビューを他のユーザーと共有することはできません。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> を Delta Live Tables パイプラインの一意の identifier に置き換えます。 ID は、Delta Live Tables UI の パイプラインの詳細 パネルで確認できます。

パイプライン実行の各インスタンスは、updateと呼ばれます。 多くの場合、最新の updateの情報を抽出する必要があります。 次のクエリを実行して、最新の update の identifier を検索し、latest_update_id 一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベント ログに系列情報のクエリを実行する

データ系列に関する情報を含むイベントには、イベントの種類 flow_definition があります。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれます。

次のクエリを使用して、入力データセットと出力データセットを抽出して系列情報を表示できます。

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

イベント ログにデータ品質のクエリを実行する

パイプラインのデータセットで期待値を定義すると、データ品質メトリックが details:flow_progress.data_quality.expectations オブジェクトに格納されます。 データ品質に関する情報を含むイベントには、イベントの種類 flow_progress があります。 次の例では、最後のパイプライン updateのデータ品質メトリックを照会します。

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

ログにクエリを実行してデータ バックログを監視する

Delta Live Tables は、details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。 バックログ メトリックを含むイベントには、イベントの種類 flow_progress があります。 次の例では、最後のパイプライン updateのバックログ メトリックを照会します。

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Note

パイプラインのデータ ソースの種類と Databricks Runtime のバージョンによっては、バックログ メトリックを使用できない場合があります。

サーバーレスが有効になっていないパイプラインのイベント ログからの拡張自動スケール イベントを監視する

サーバーレス コンピューティングを使用しない DLT パイプラインの場合、パイプラインで拡張自動スケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 拡張自動スケールに関する情報を含むイベントには、イベントの種類が autoscale。 クラスターのサイズ変更要求の情報は、details:autoscale オブジェクトに格納されます。 次の例では、最後のパイプライン updateに対する拡張自動スケール クラスターのサイズ変更要求を照会します。

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

リソース使用率を監視する

cluster_resources イベントでは、クラスター内のタスク スロットの数、それらのタスク スロットの使用率、スケジュールを待機しているタスクの数に関するメトリックが提供されます。

拡張自動スケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなど、自動スケール アルゴリズムのメトリックも含まれます。 イベントには、アルゴリズムの状態も、CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION などのさまざまな状態として表示されます。 この情報は、自動スケール イベントと組み合わせて表示して、拡張自動スケールの全体像を提供できます。

次の例では、最後のパイプライン updateのタスク キュー サイズ履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、最後のパイプライン updateの使用率履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、強化された自動スケール パイプラインでのみ使用できるメトリックと共に、最新の要求でアルゴリズムによって要求された Executor の数、最新のメトリックに基づいてアルゴリズムによって推奨される Executor の最適な数、自動スケール アルゴリズムの状態など、Executor カウント履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Delta Live Tables パイプラインを監査する

Delta Live Tables のイベントログレコードとその他の Azure Databricks 監査ログを使用して、Delta Live Tablesでデータが更新される方法について包括的な理解を get できます。

Delta Live Tables は、パイプライン所有者の credentials を使用して更新を実行します。 パイプライン所有者を更新することで、使用される credentials を変更できます。 Delta Live Tables は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。

Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog のイベント」を参照してください。

イベント ログにユーザー アクションのクエリを実行する

イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザー アクションに関する情報を含むイベントには、イベントの種類 user_action があります。

アクションに関する情報は、user_action フィールドの details オブジェクトに格納されます。 次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用される event_log_raw ビューを作成するには、「イベント ログのクエリ実行」を参照してください。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

ランタイム情報

パイプライン updateのランタイム情報 (たとえば、updateの Databricks Runtime バージョン) を表示できます。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0