Compartir a través de


Solución de problemas de ingesta de manifiestos mediante registros de tareas de Airflow

Este artículo le ayuda a solucionar problemas de flujo de trabajo con la ingesta de manifiestos en Azure Data Manager for Energy mediante registros de tareas de Airflow.

Secuencia del flujo de trabajo de ingesta del manifiesto

Hay dos tipos de flujos de trabajo de grafo acíclico dirigido (DAG) para la ingesta de manifiestos: manifiesto único y carga por lotes.

Manifiesto único

Se usa un único archivo de manifiesto para desencadenar el flujo de trabajo de ingesta de manifiestos.

Valor DagTaskName Descripción
update_status_running_task Llama al servicio de flujo de trabajo y marca el estado del DAG como running en la base de datos.
check_payload_type Valida si el tipo de ingesta es por lotes o manifiesto único.
validate_manifest_schema_task Garantiza que todos los tipos de esquema mencionados en el manifiesto estén presentes y que haya integridad de esquema referencial. Todos los valores no válidos se expulsan del manifiesto.
provide_manifest_integrity_task Valida las referencias dentro del manifiesto de OSDU® R3 y quita entidades no válidas. Este operador es responsable de la validación primaria o secundaria. Todas las entidades de tipo huérfano se registran y excluyen del manifiesto validado. Se busca en todos los registros externos a los que se hace referencia. Si no se encuentra ninguno, se quita la entidad de manifiesto. También se resuelven todas las referencias de clave suplentes.
process_single_manifest_file_task Realiza la ingesta de las entidades de manifiesto finales obtenidas del paso anterior. Los registros de datos se ingieren a través del servicio de almacenamiento.
update_status_finished_task Llama al servicio de flujo de trabajo y marca el estado del DAG como finished o failed en la base de datos.

Carga por lotes

Varios archivos de manifiesto forman parte de la misma solicitud de servicio de flujo de trabajo. La sección de manifiesto de la carga de la solicitud es una lista en lugar de un diccionario de elementos.

Valor DagTaskName Descripción
update_status_running_task Llama al servicio de flujo de trabajo y marca el estado del DAG como running en la base de datos.
check_payload_type Valida si el tipo de ingesta es por lotes o manifiesto único.
batch_upload Divide la lista de manifiestos en tres lotes que se van a procesar en paralelo. (No se emiten registros de tareas).
process_manifest_task_(1 / 2 / 3) Divide la lista de manifiestos en grupos de tres y los procesa. Todos los pasos realizados en validate_manifest_schema_task, provide_manifest_integrity_task y process_single_manifest_file_task se condensan y realizan secuencialmente en estas tareas.
update_status_finished_task Llama al servicio de flujo de trabajo y marca el estado del DAG como finished o failed en la base de datos.

En función del tipo de carga (único o por lotes), la tarea check_payload_type elige la rama adecuada y omite las tareas de la otra rama.

Requisitos previos

Debe tener registros de tareas de Airflow integrados con Azure Monitor. Consulte Integración de registros de Airflow con Azure Monitor.

Las columnas siguientes se exponen en los registros de tareas de Airflow para depurar el problema:

Nombre de parámetro Descripción
RunID Identificador de ejecución único de la ejecución de DAG desencadenada.
CorrelationID Identificador de correlación único de la ejecución de DAG (igual que el identificador de ejecución).
DagName Nombre del flujo de trabajo de DAG. Por ejemplo, Osdu_ingest es el nombre del flujo de trabajo para la ingesta de manifiestos.
DagTaskName Nombre de tarea para el flujo de trabajo de DAG. Por ejemplo, update_status_running_task es el nombre de la tarea para la ingesta de manifiestos.
Content Mensajes de registro de errores (errores o excepciones) que emite Airflow durante la ejecución de la tarea.
LogTimeStamp Intervalo de tiempo de ejecuciones de DAG.
LogLevel Nivel del error. Los valores son DEBUG, INFO, WARNING y ERROR. Puede ver la mayoría de los mensajes de excepción y error filtrando en el nivel de ERROR.

Ejecución de DAG con errores

Error en la ejecución del flujo de trabajo en Update_status_running_task o Update_status_finished_task, y no se han ingerido los registros de datos.

Razones posibles

  • La llamada a la API de partición no se ha autenticado porque el identificador de partición de datos es incorrecto.
  • Un nombre de clave en el contexto de ejecución del cuerpo de la solicitud es incorrecto.
  • El servicio de flujo de trabajo no se está ejecutando o produce errores 5xx.

Estado de flujo de trabajo

El estado del flujo de trabajo se marca como failed.

Solución

Compruebe los registros de tareas de Airflow para update_status_running_task o update_status_finished_task. Corrija la carga pasando el identificador de partición de datos o el nombre de clave correctos.

Consulta Kusto de ejemplo:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Salida de seguimiento de ejemplo:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Error en la validación del esquema

No se han ingerido registros porque se produjo un error en la validación del esquema.

Razones posibles

  • El servicio de esquema produce errores de "Esquema no encontrado".
  • El cuerpo del manifiesto no se ajusta al tipo de esquema.
  • Las referencias de esquema son incorrectas.
  • El servicio de esquema produce errores 5xx.

Estado de flujo de trabajo

El estado del flujo de trabajo se marca como finished. No se observa un error en el estado del flujo de trabajo porque se omiten las entidades no válidas y se continúa la ingesta.

Solución

Compruebe los registros de tareas de Airflow para validate_manifest_schema_task o process_manifest_task. Corrija la carga pasando el identificador de partición de datos o el nombre de clave correctos.

Consulta Kusto de ejemplo:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Salida de seguimiento de ejemplo:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Comprobaciones de referencia erróneas

No se han ingerido registros porque se produjo un error en las comprobaciones de referencia.

Razones posibles

  • No se encontraron registros a los que se hace referencia.
  • No se encontraron registros primarios.
  • El servicio de búsqueda produce errores 5xx.

Estado de flujo de trabajo

El estado del flujo de trabajo se marca como finished. No se observa un error en el estado del flujo de trabajo porque se omiten las entidades no válidas y se continúa la ingesta.

Solución

Compruebe los registros de tareas de Airflow para provide_manifest_integrity_task o process_manifest_task.

Consulta Kusto de ejemplo:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Dado que no hay registros de errores específicamente para las tareas de integridad referencial, compruebe las instrucciones de registro de depuración para ver si todos los registros externos se capturaron a través del servicio de búsqueda.

Por ejemplo, la siguiente salida de seguimiento de ejemplo muestra un registro consultado a través del servicio de búsqueda para la integridad referencial:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

La salida muestra los registros recuperados y que estaban en el sistema. El objeto de manifiesto relacionado al que se hace referencia a un registro se quitaría y ya no se ingeriría si observaba que algunos de los registros no estaban presentes.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

No se han ingerido registros porque el manifiesto contiene etiquetas legales no válidas o listas de control de acceso (ACL).

Razones posibles

  • Las ACL son incorrectas.
  • Las etiquetas legales son incorrectas.
  • El servicio de almacenamiento produce errores 5xx.

Estado de flujo de trabajo

El estado del flujo de trabajo se marca como finished. No observa un error en el estado del flujo de trabajo.

Solución

Compruebe los registros de tareas de Airflow para process_single_manifest_file_task o process_manifest_task.

Consulta Kusto de ejemplo:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Salida de seguimiento de ejemplo:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

La salida indica los registros recuperados. Los registros de entidad de manifiesto que corresponden a los registros de búsqueda que faltan se quitan y no se ingieren.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Problemas conocidos

  • Dado que no hay registros de errores específicos para las tareas de integridad referencial, debe buscar manualmente las instrucciones de registro de depuración para ver si todos los registros externos se recuperaron a través del servicio de búsqueda.

Pasos siguientes

Pase al siguiente tutorial y aprenda a realizar una ingesta de archivos basada en manifiesto:

Referencias