Troubleshoot manifest ingestion problems by using Airflow task logs

This article helps you troubleshoot workflow problems with manifest ingestion in Azure Data Manager for Energy by using Airflow task logs.

Manifest ingestion DAG workflow types

There are two types of directed acyclic graph (DAG) workflows for manifest ingestion: single manifest and batch upload.

Single manifest

One single manifest file is used to trigger the manifest ingestion workflow.

DagTaskName value Description
update_status_running_task Calls the workflow service and marks the status of the DAG as running in the database.
check_payload_type Validates whether the type of ingestion is batch or single manifest.
validate_manifest_schema_task Ensures that all the schema types mentioned in the manifest are present and there's referential schema integrity. All invalid values are evicted from the manifest.
provide_manifest_integrity_task Validates references inside the OSDU® R3 manifest and removes invalid entities. This operator is responsible for parent/child validation. All orphan-like entities are logged and excluded from the validated manifest. Any external referenced records are searched. If none are found, the manifest entity is dropped. All surrogate key references are also resolved.
process_single_manifest_file_task Performs ingestion of the final manifest entities obtained from the previous step. Data records are ingested via the storage service.
update_status_finished_task Calls the workflow service and marks the status of the DAG as finished or failed in the database.

Batch upload

Multiple manifest files are part of the same workflow service request. The manifest section in the request payload is a list instead of a dictionary of items.

DagTaskName value Description
update_status_running_task Calls the workflow service and marks the status of the DAG as running in the database.
check_payload_type Validates whether the type of ingestion is batch or single manifest.
batch_upload Divides the list of manifests into three batches to be processed in parallel. (No task logs are emitted.)
process_manifest_task_(1 / 2 / 3) Divides the list of manifests into groups of three and processes them. All the steps performed in validate_manifest_schema_task, provide_manifest_integrity_task, and process_single_manifest_file_task are condensed and performed sequentially in these tasks.
update_status_finished_task Calls the workflow service and marks the status of the DAG as finished or failed in the database.

Based on the payload type (single or batch), the check_payload_type task chooses the appropriate branch and skips the tasks in the other branch.

Prerequisites

You should have integrated Airflow task logs with Azure Monitor. See Integrate Airflow logs with Azure Monitor.

The following columns are exposed in Airflow task logs for you to debug the problem:

Parameter name Description
RunID Unique run ID of the triggered DAG run.
CorrelationID Unique correlation ID of the DAG run (same as the run ID).
DagName DAG workflow name. For instance, Osdu_ingest is the workflow name for manifest ingestion.
DagTaskName Task name for the DAG workflow. For instance, update_status_running_task is the task name for manifest ingestion.
Content Error log messages (errors or exceptions) that Airflow emits during the task execution.
LogTimeStamp Time interval of DAG runs.
LogLevel Level of the error. Values are DEBUG, INFO, WARNING, and ERROR. You can see most exception and error messages by filtering at the ERROR level.

Failed DAG run

The workflow run failed in Update_status_running_task or Update_status_finished_task, and the data records weren't ingested.

Possible reasons

  • Call to partition API wasn't authenticated as the data partition ID is incorrect.
  • A key name in the execution context of the request body is incorrect.
  • The workflow service isn't running or is throwing 5xx errors.

Workflow status

The workflow status is marked as failed.

Solution

Check the Airflow task logs for update_status_running_task or update_status_finished_task. Fix the payload by passing the correct data partition ID or key name.

Sample Kusto query:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Sample trace output:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Failed schema validation

Records weren't ingested because schema validation failed.

Possible reasons

  • The schema service is throwing "Schema not found" errors.
  • The manifest body doesn't conform to the schema type.
  • The schema references are incorrect.
  • The schema service is throwing 5xx errors.

Workflow status

The workflow status is marked as finished. You don't observe a failure in the workflow status because the invalid entities are skipped and the ingestion is continued.

Solution

Check the Airflow task logs for validate_manifest_schema_task or process_manifest_task. Fix the payload by passing the correct data partition ID or key name.

Sample Kusto query:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Sample trace output:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Failed reference checks

Records weren't ingested because reference checks failed.

Possible reasons

  • Referenced records weren't found.
  • Parent records weren't found.
  • The search service is throwing 5xx errors.

Workflow status

The workflow status is marked as finished. You don't observe a failure in the workflow status because the invalid entities are skipped and the ingestion is continued.

Solution

Check the Airflow task logs for provide_manifest_integrity_task or process_manifest_task.

Sample Kusto query:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Because there are no error logs specifically for referential integrity tasks, check the debug log statements to see whether all external records were fetched via the search service.

For instance, the following sample trace output shows a record queried via the search service for referential integrity:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

The output shows the records that were retrieved and were in the system. The related manifest object that referenced a record would be dropped and no longer be ingested if you noticed that some of the records weren't present.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

Records weren't ingested because the manifest contains invalid legal tags or access control lists (ACLs).

Possible reasons

  • ACLs are incorrect.
  • Legal tags are incorrect.
  • The storage service is throwing 5xx errors.

Workflow status

The workflow status is marked as finished. You don't observe a failure in the workflow status.

Solution

Check the Airflow task logs for process_single_manifest_file_task or process_manifest_task.

Sample Kusto query:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Sample trace output:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

The output indicates records that were retrieved. Manifest entity records that correspond to missing search records are dropped and not ingested.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Known issues

  • Because there are no specific error logs for referential integrity tasks, you must manually search for the debug log statements to see whether all external records were retrieved via the search service.

Next steps

Advance to the following tutorial and learn how to perform a manifest-based file ingestion:

References