Airflow タスク ログを使用したマニフェスト インジェストの問題のトラブルシューティング
この記事は、Airflow タスク ログを使用して、Azure Data Manager for Energy でのマニフェスト インジェストに関するワークフローの問題のトラブルシューティングに役立ちます。
マニフェスト インジェスト DAG ワークフローの種類
マニフェスト インジェストには、単一マニフェストとバッチ アップロードの 2 種類の有向非巡回グラフ (DAG) ワークフローがあります。
単一マニフェスト
1 つのマニフェスト ファイルを使用して、マニフェスト インジェスト ワークフローをトリガーします。
DagTaskName 値 | 説明 |
---|---|
update_status_running_task |
ワークフロー サービスを呼び出し、データベースで DAG の状態を running とマークします。 |
check_payload_type |
インジェストの種類がバッチ マニフェストか単一マニフェストかを検証します。 |
validate_manifest_schema_task |
マニフェストに記載されているすべてのスキーマ型が存在し、参照スキーマの整合性があることを確認します。 無効な値はすべてマニフェストから削除されます。 |
provide_manifest_integrity_task |
OSDU® R3 マニフェスト内の参照を検証し、無効なエンティティを削除します。 この演算子は、親/子の検証を担当します。 孤立したエンティティはすべてログに記録され、検証されたマニフェストから除外されます。 参照先の外部レコードが検索されます。 何も見つからない場合、マニフェスト エンティティは削除されます。 すべての代理キー参照も解決されます。 |
process_single_manifest_file_task |
前の手順から取得した最終的なマニフェスト エンティティのインジェストを実行します。 データ レコードは、ストレージ サービスを介して取り込まれます。 |
update_status_finished_task |
ワークフロー サービスを呼び出し、データベースで DAG の状態を finished または failed とマークします。 |
バッチ アップロード
複数のマニフェスト ファイルが同じワークフロー サービス要求の一部です。 要求ペイロードのマニフェスト セクションは、項目の辞書ではなくリストです。
DagTaskName 値 | 説明 |
---|---|
update_status_running_task |
ワークフロー サービスを呼び出し、データベースで DAG の状態を running とマークします。 |
check_payload_type |
インジェストの種類がバッチ マニフェストか単一マニフェストかを検証します。 |
batch_upload |
マニフェストの一覧を 3 つのバッチに分割し、並列で処理します。 (タスク ログは出力されません)。 |
process_manifest_task_(1 / 2 / 3) |
マニフェストの一覧を 3 つのグループに分割し、処理します。 validate_manifest_schema_task 、provide_manifest_integrity_task 、process_single_manifest_file_task で実行されるすべてのステップは圧縮され、これらのタスクで順番に実行されます。 |
update_status_finished_task |
ワークフロー サービスを呼び出し、データベースで DAG の状態を finished または failed とマークします。 |
ペイロードの種類 (単一またはバッチ) に基づいて、check_payload_type
タスクは適切なブランチを選択し、もう一方のブランチのタスクをスキップします。
前提条件
Airflow タスク ログを Azure Monitor と統合する必要があります。 「Airflow ログと Azure Monitor を統合する方法」を参照してください。
次の列は、問題をデバッグするために、エアフロー タスク ログで公開されています。
パラメーター名 | 説明 |
---|---|
RunID |
トリガーされた DAG 実行の一意の実行 ID。 |
CorrelationID |
DAG 実行の一意の関連付け ID (実行 ID と同じ)。 |
DagName |
DAG ワークフロー名。 たとえば、Osdu_ingest はマニフェスト インジェストのワークフロー名です。 |
DagTaskName |
DAG ワークフローのタスク名。 たとえば、update_status_running_task はマニフェスト インジェストのタスク名です。 |
Content |
タスクの実行中に Airflow が出力するエラー ログ メッセージ (エラーまたは例外)。 |
LogTimeStamp |
DAG の実行の期間。 |
LogLevel |
エラーのレベル。 値は DEBUG 、INFO 、WARNING 、ERROR です。 ほとんどの例外とエラー メッセージは、ERROR レベルでフィルター処理することで確認できます。 |
失敗した DAG の実行
ワークフローの実行が Update_status_running_task
または Update_status_finished_task
で失敗し、データ レコードが取り込まれませんでした。
考えられる原因
- データ パーティション ID が正しくなかったため、パーティション API の呼び出しが認証されなかった。
- 要求本文の実行コンテキストのキー名が正しくない。
- ワークフロー サービスが実行されていないか、5xx エラーがスローされている。
ワークフローの状態
ワークフローの状態は、failed
とマークされます。
解決策
update_status_running_task
または update_status_finished_task
がないか Airflow タスク ログを確認します。 正しいデータ パーティション ID またはキー名を渡してペイロードを修正します。
サンプル Kusto クエリ:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
サンプル トレース出力:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
スキーマの検証に失敗
スキーマの検証に失敗したため、レコードが取り込まれませんでした。
考えられる原因
- スキーマ サービスが "スキーマが見つかりません" というエラーをスローしている。
- マニフェスト本文がスキーマ型に準拠していない。
- スキーマ参照が正しくない。
- スキーマ サービスが 5xx エラーをスローしている。
ワークフローの状態
ワークフローの状態は、finished
とマークされます。 無効なエンティティがスキップされ、インジェストが続行されるため、ワークフローの状態でエラーが発生することはありません。
解決策
validate_manifest_schema_task
または process_manifest_task
がないか Airflow タスク ログを確認します。 正しいデータ パーティション ID またはキー名を渡してペイロードを修正します。
サンプル Kusto クエリ:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
サンプル トレース出力:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
参照チェックの失敗
参照チェックが失敗したため、レコードが取り込まれませんでした。
考えられる原因
- 参照されたレコードが見つからなかった。
- 親レコードが見つからなかった。
- 検索サービスが 5xx エラーをスローしている。
ワークフローの状態
ワークフローの状態は、finished
とマークされます。 無効なエンティティがスキップされ、インジェストが続行されるため、ワークフローの状態でエラーが発生することはありません。
解決策
provide_manifest_integrity_task
または process_manifest_task
がないか Airflow タスク ログを確認します。
サンプル Kusto クエリ:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
参照整合性タスク専用のエラー ログがないため、デバッグ ログ ステートメントを調べて、すべての外部レコードが検索サービスを介してフェッチされたかどうかを確認します。
たとえば、次のサンプル トレース出力は、参照整合性のために検索サービスを介して照会されたレコードを示しています。
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
出力には、システムで取得され、存在していたレコードが表示されます。 レコードの一部が存在しない場合は、レコードを参照していた関連するマニフェスト オブジェクトは削除され、取り込まれなくなります。
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
マニフェストの有効なタグまたは ACL が無効
マニフェストに無効な法的タグまたはアクセス制御リスト (ACL) が含まれているため、レコードは取り込まれませんでした。
考えられる原因
- ACL が正しくない。
- 法的タグが正しくない。
- ストレージ サービスが 5xx エラーをスローしている。
ワークフローの状態
ワークフローの状態は、finished
とマークされます。 ワークフローの状態にエラーがありません。
解決策
process_single_manifest_file_task
または process_manifest_task
がないか Airflow タスク ログを確認します。
サンプル Kusto クエリ:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
サンプル トレース出力:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
出力は、取得されたレコードを示します。 不足している検索レコードに対応するマニフェスト エンティティ レコードは削除され、取り込まれません。
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
既知の問題
- 参照整合性タスクの特定のエラー ログがないため、デバッグ ログ ステートメントを手動で検索して、すべての外部レコードが検索サービスを介して取得されたかどうかを確認する必要があります。
次のステップ
次のチュートリアルに進み、マニフェスト ベースのファイル インジェストを実行する方法を学習します。