你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Airflow 任务日志对清单引入问题进行故障排除
本文帮助你使用 Airflow 任务日志解决 Azure Data Manager for Energy 中的清单引入工作流问题。
清单引入 DAG 工作流类型
用于清单提取的有向无环图 (DAG) 工作流程有两种类型:单个清单和批量上传。
单个清单
一个清单文件用于触发清单引入工作流程。
DagTaskName 值 | 说明 |
---|---|
update_status_running_task |
调用工作流服务,并在数据库中将 DAG 的状态标记为 running 。 |
check_payload_type |
验证引入的类型是批处理还是单个清单。 |
validate_manifest_schema_task |
确保清单中提到的所有架构类型都存在并且具有引用架构完整性。 系统会从清单中逐出所有无效值。 |
provide_manifest_integrity_task |
验证 OSDU® R3 清单内的引用并删除无效实体。 此运算符用于父/子验证。 所有类似孤立的实体都会被记录并从经过验证的清单中排除。 搜索任何外部引用的记录。 如果未找到清单实体,则会删除清单实体。 系统也会解析所有代理键引用。 |
process_single_manifest_file_task |
执行从上一步获取的最终清单实体的引入。 通过存储服务引入数据记录。 |
update_status_finished_task |
调用工作流服务,并在数据库中将 DAG 的状态标记为 finished 或 failed 。 |
批量上传
多个清单文件是同一工作流服务请求的一部分。 请求有效负载中的清单部分是一个列表,而不是项目字典。
DagTaskName 值 | 说明 |
---|---|
update_status_running_task |
调用工作流服务,并在数据库中将 DAG 的状态标记为 running 。 |
check_payload_type |
验证引入的类型是批处理还是单个清单。 |
batch_upload |
将清单列表划分为三批,以便并行处理。 (不发出任何任务日志。) |
process_manifest_task_(1 / 2 / 3) |
将清单列表划分为三组并对其进行处理。 系统会在这些任务中压缩并按顺序执行 validate_manifest_schema_task 、provide_manifest_integrity_task 和 process_single_manifest_file_task 中执行的所有步骤。 |
update_status_finished_task |
调用工作流服务,并在数据库中将 DAG 的状态标记为 finished 或 failed 。 |
根据有效负载类型(单个或批量),check_payload_type
任务会选择适当的分支并跳过其他分支中的任务。
先决条件
应已将 Airflow 任务日志与 Azure Monitor 集成。 请查看“将 Airflow 日志与 Azure Monitor 集成”。
Airflow 任务日志中公开了以下列,以便你调试问题:
参数名称 | 说明 |
---|---|
RunID |
触发的 DAG 运行的唯一运行 ID。 |
CorrelationID |
DAG 运行的唯一关联 ID(与运行 ID 相同)。 |
DagName |
DAG 工作流名称。 例如,Osdu_ingest 是清单引入的工作流名称。 |
DagTaskName |
DAG 工作流的任务名称。 例如,update_status_running_task 是清单引入的任务名称。 |
Content |
Airflow 在任务执行期间发出的错误日志消息(错误或异常)。 |
LogTimeStamp |
DAG 运行的时间间隔。 |
LogLevel |
错误的级别。 值为 DEBUG 、INFO 、WARNING 和 ERROR 。 可以通过在 ERROR 级别进行筛选来查看大多数异常和错误消息。 |
失败的 DAG 运行
Update_status_running_task
或 Update_status_finished_task
中的工作流运行失败,并且未引入数据记录。
可能的原因
- 由于数据分区 ID 不正确,对分区 API 的调用未经过身份验证。
- 请求正文的执行上下文中的键名称不正确。
- 工作流服务未运行或抛出 5xx 错误。
工作流状态
工作流状态标记为 failed
。
解决方案
检查 Airflow 任务日志中是否存在 update_status_running_task
或 update_status_finished_task
。 通过传递正确的数据分区 ID 或键名称来修复有效负载。
示例 Kusto 查询:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
示例跟踪输出:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
架构验证失败
记录引入失败,因为架构验证失败。
可能的原因
- 架构服务抛出“找不到架构”错误。
- 清单正文不符合架构类型。
- 架构引用不正确。
- 架构服务抛出 5xx 错误。
工作流状态
工作流状态标记为 finished
。 由于跳过了无效实体并继续引入,因此不会观察到工作流状态中的失败。
解决方案
检查 Airflow 任务日志中是否存在 validate_manifest_schema_task
或 process_manifest_task
。 通过传递正确的数据分区 ID 或键名称来修复有效负载。
示例 Kusto 查询:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
示例跟踪输出:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
失败的引用检查
未能引入记录,因为引用检查失败。
可能的原因
- 找不到引用的记录。
- 找不到父记录。
- 搜索服务抛出 5xx 错误。
工作流状态
工作流状态标记为 finished
。 由于跳过了无效实体并继续引入,因此不会观察到工作流状态中的失败。
解决方案
检查 Airflow 任务日志中是否存在 provide_manifest_integrity_task
或 process_manifest_task
。
示例 Kusto 查询:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
由于没有专门用于引用完整性任务的错误日志,因此请检查调试日志语句,以查看是否通过搜索服务提取了所有外部记录。
例如,以下示例跟踪输出显示通过搜索服务查询的记录,以获取引用完整性:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
输出会显示系统中检索到的记录。 如果发现某些记录不存在,则引用记录的相关清单对象将被删除并且不再被引入。
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
清单中的法律标记或 ACL 无效
记录未引入,因为清单包含无效的法律标记或访问控制列表 (ACL)。
可能的原因
- ACL 不正确。
- 法律标记不正确。
- 存储服务抛出 5xx 错误。
工作流状态
工作流状态标记为 finished
。 你不会在工作流状态中观察到失败。
解决方案
检查 Airflow 任务日志中是否存在 process_single_manifest_file_task
或 process_manifest_task
。
示例 Kusto 查询:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
示例跟踪输出:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
输出会指示检索到的记录。 与丢失的搜索记录相对应的清单实体记录将被删除且不会被引入。
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
已知问题
- 由于引用完整性任务没有特定的错误日志,因此必须手动搜索调试日志语句,以查看是否通过搜索服务检索了所有外部记录。
后续步骤
请继续阅读以下教程,了解如何执行基于清单的文件引入: