你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Airflow 任务日志对清单引入问题进行故障排除

本文帮助你使用 Airflow 任务日志解决 Azure Data Manager for Energy 中的清单引入工作流问题。

清单引入 DAG 工作流类型

用于清单提取的有向无环图 (DAG) 工作流程有两种类型:单个清单和批量上传。

单个清单

一个清单文件用于触发清单引入工作流程。

DagTaskName 值 说明
update_status_running_task 调用工作流服务,并在数据库中将 DAG 的状态标记为 running
check_payload_type 验证引入的类型是批处理还是单个清单。
validate_manifest_schema_task 确保清单中提到的所有架构类型都存在并且具有引用架构完整性。 系统会从清单中逐出所有无效值。
provide_manifest_integrity_task 验证 OSDU® R3 清单内的引用并删除无效实体。 此运算符用于父/子验证。 所有类似孤立的实体都会被记录并从经过验证的清单中排除。 搜索任何外部引用的记录。 如果未找到清单实体,则会删除清单实体。 系统也会解析所有代理键引用。
process_single_manifest_file_task 执行从上一步获取的最终清单实体的引入。 通过存储服务引入数据记录。
update_status_finished_task 调用工作流服务,并在数据库中将 DAG 的状态标记为 finishedfailed

批量上传

多个清单文件是同一工作流服务请求的一部分。 请求有效负载中的清单部分是一个列表,而不是项目字典。

DagTaskName 值 说明
update_status_running_task 调用工作流服务,并在数据库中将 DAG 的状态标记为 running
check_payload_type 验证引入的类型是批处理还是单个清单。
batch_upload 将清单列表划分为三批,以便并行处理。 (不发出任何任务日志。)
process_manifest_task_(1 / 2 / 3) 将清单列表划分为三组并对其进行处理。 系统会在这些任务中压缩并按顺序执行 validate_manifest_schema_taskprovide_manifest_integrity_taskprocess_single_manifest_file_task 中执行的所有步骤。
update_status_finished_task 调用工作流服务,并在数据库中将 DAG 的状态标记为 finishedfailed

根据有效负载类型(单个或批量),check_payload_type 任务会选择适当的分支并跳过其他分支中的任务。

先决条件

应已将 Airflow 任务日志与 Azure Monitor 集成。 请查看“将 Airflow 日志与 Azure Monitor 集成”。

Airflow 任务日志中公开了以下列,以便你调试问题:

参数名称 说明
RunID 触发的 DAG 运行的唯一运行 ID。
CorrelationID DAG 运行的唯一关联 ID(与运行 ID 相同)。
DagName DAG 工作流名称。 例如,Osdu_ingest 是清单引入的工作流名称。
DagTaskName DAG 工作流的任务名称。 例如,update_status_running_task 是清单引入的任务名称。
Content Airflow 在任务执行期间发出的错误日志消息(错误或异常)。
LogTimeStamp DAG 运行的时间间隔。
LogLevel 错误的级别。 值为 DEBUGINFOWARNINGERROR。 可以通过在 ERROR 级别进行筛选来查看大多数异常和错误消息。

失败的 DAG 运行

Update_status_running_taskUpdate_status_finished_task 中的工作流运行失败,并且未引入数据记录。

可能的原因

  • 由于数据分区 ID 不正确,对分区 API 的调用未经过身份验证。
  • 请求正文的执行上下文中的键名称不正确。
  • 工作流服务未运行或抛出 5xx 错误。

工作流状态

工作流状态标记为 failed

解决方案

检查 Airflow 任务日志中是否存在 update_status_running_taskupdate_status_finished_task。 通过传递正确的数据分区 ID 或键名称来修复有效负载。

示例 Kusto 查询:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

示例跟踪输出:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

架构验证失败

记录引入失败,因为架构验证失败。

可能的原因

  • 架构服务抛出“找不到架构”错误。
  • 清单正文不符合架构类型。
  • 架构引用不正确。
  • 架构服务抛出 5xx 错误。

工作流状态

工作流状态标记为 finished。 由于跳过了无效实体并继续引入,因此不会观察到工作流状态中的失败。

解决方案

检查 Airflow 任务日志中是否存在 validate_manifest_schema_taskprocess_manifest_task。 通过传递正确的数据分区 ID 或键名称来修复有效负载。

示例 Kusto 查询:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

示例跟踪输出:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

失败的引用检查

未能引入记录,因为引用检查失败。

可能的原因

  • 找不到引用的记录。
  • 找不到父记录。
  • 搜索服务抛出 5xx 错误。

工作流状态

工作流状态标记为 finished。 由于跳过了无效实体并继续引入,因此不会观察到工作流状态中的失败。

解决方案

检查 Airflow 任务日志中是否存在 provide_manifest_integrity_taskprocess_manifest_task

示例 Kusto 查询:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

由于没有专门用于引用完整性任务的错误日志,因此请检查调试日志语句,以查看是否通过搜索服务提取了所有外部记录。

例如,以下示例跟踪输出显示通过搜索服务查询的记录,以获取引用完整性:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

输出会显示系统中检索到的记录。 如果发现某些记录不存在,则引用记录的相关清单对象将被删除并且不再被引入。

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

记录未引入,因为清单包含无效的法律标记或访问控制列表 (ACL)。

可能的原因

  • ACL 不正确。
  • 法律标记不正确。
  • 存储服务抛出 5xx 错误。

工作流状态

工作流状态标记为 finished。 你不会在工作流状态中观察到失败。

解决方案

检查 Airflow 任务日志中是否存在 process_single_manifest_file_taskprocess_manifest_task

示例 Kusto 查询:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

示例跟踪输出:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

输出会指示检索到的记录。 与丢失的搜索记录相对应的清单实体记录将被删除且不会被引入。

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

已知问题

  • 由于引用完整性任务没有特定的错误日志,因此必须手动搜索调试日志语句,以查看是否通过搜索服务检索了所有外部记录。

后续步骤

请继续阅读以下教程,了解如何执行基于清单的文件引入:

参考