Compartilhar via


Solucione problemas de ingestão de manifesto usando registros de tarefas do Airflow

Esse artigo ajuda-o a resolver problemas de fluxo de trabalho com ingestão de manifesto no Azure Data Manager for Energy utilizando registos de tarefas do Airflow.

Tipos de fluxo de trabalho DAG de ingestão de manifesto

Existem dois tipos de fluxos de trabalho de gráfico acíclico direcionado (DAG) para ingestão de manifesto: manifesto único e fazer upload em lote.

Manifesto único

Um único arquivo de manifesto é usado para acionar o fluxo de trabalho de ingestão de manifesto.

Valor DagTaskName Descrição
update_status_running_task Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados.
check_payload_type Valida se o tipo de ingestão é em lote ou manifesto único.
validate_manifest_schema_task Garante que todos os tipos de esquema mencionados no manifesto estejam presentes e que haja integridade de esquema referencial. Todos os valores inválidos são removidos do manifesto.
provide_manifest_integrity_task Valida referências dentro do manifesto OSDU® R3 e remove entidades inválidas. Esse operador é responsável pela validação pai/filho. Todas as entidades órfãs são registradas e excluídas do manifesto validado. Quaisquer registros referenciados externos são pesquisados. Se nenhum for encontrado, a entidade manifesta será descartada. Todas as referências de chave substituta também são resolvidas.
process_single_manifest_file_task Executa a ingestão das entidades manifestas finais obtidas na etapa anterior. Os registros de dados são ingeridos por meio do serviço de armazenamento.
update_status_finished_task Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados.

Carregamento em lote

Vários arquivos de manifesto fazem parte da mesma solicitação de serviço de fluxo de trabalho. A seção de manifesto na carga da solicitação é uma lista em vez de um dicionário de itens.

Valor DagTaskName Descrição
update_status_running_task Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados.
check_payload_type Valida se o tipo de ingestão é em lote ou manifesto único.
batch_upload Divide a lista de manifestos em três lotes a serem processados em paralelo. (Nenhum log de tarefa é emitido.)
process_manifest_task_(1 / 2 / 3) Divide a lista de manifestos em grupos de três e os processa. Todas as etapas executadas em validate_manifest_schema_task, provide_manifest_integrity_task, e process_single_manifest_file_task são condensadas e executadas sequencialmente nessas tarefas.
update_status_finished_task Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados.

Com base no tipo de carga útil (única ou em lote), a tarefa check_payload_type escolhe a ramificação apropriada e ignora as tarefas na outra ramificação.

Pré-requisitos

Você deve ter logs de tarefas integrados do Airflow com o Azure Monitor. Veja Integrar logs do Airflow com o Azure Monitor.

As colunas a seguir são expostas nos logs de tarefas do Airflow para você depurar o problema:

Nome do Parâmetro Descrição
RunID ID de execução exclusivo da execução do DAG acionada.
CorrelationID ID de correlação exclusivo da execução do DAG (igual ao ID da execução).
DagName Nome do fluxo de trabalho do DAG. Por exemplo, Osdu_ingest é o nome do fluxo de trabalho para ingestão de manifesto.
DagTaskName Nome da tarefa para o fluxo de trabalho do DAG. Por exemplo, update_status_running_task é o nome da tarefa para ingestão de manifesto.
Content Mensagens de log de erros (erros ou exceções) que o Airflow emite durante a execução da tarefa.
LogTimeStamp Intervalo de tempo de execuções do DAG.
LogLevel Nível do erro. Os valores são DEBUG, INFO, WARNING, e ERROR. Você pode ver a maioria das mensagens de exceção e de erro filtrando no nível ERROR.

Falha na execução do DAG

A execução do fluxo de trabalho falhou em Update_status_running_task ou Update_status_finished_task e os registros de dados não foram ingeridos.

Possíveis motivos

  • A chamada para a API de partição não foi autenticada porque o ID da partição de dados está incorreto.
  • Um nome de chave no contexto de execução do corpo da solicitação está incorreto.
  • O serviço de fluxo de trabalho não está em execução ou gera erros 5xx.

Status do fluxo de trabalho

O status do fluxo de trabalho é marcado como failed.

Solução

Verifique os registros de tarefas do Airflow para update_status_running_task ou update_status_finished_task. Corrija a carga passando o ID da partição de dados ou o nome da chave correto.

Exemplo de Consulta do Kusto:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Exemplo de saída de rastreamento:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Falha na validação do esquema

Os registros não foram ingeridos porque a validação do esquema falhou.

Possíveis motivos

  • O serviço de esquema está gerando erros de "Esquema não encontrado".
  • O corpo do manifesto não está em conformidade com o tipo de esquema.
  • As referências do esquema estão incorretas.
  • O serviço de esquema está gerando erros 5xx.

Status do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão continua.

Solução

Verifique os registros de tarefas do Airflow para validate_manifest_schema_task ou process_manifest_task. Corrija a carga passando o ID da partição de dados ou o nome da chave correto.

Exemplo de Consulta do Kusto:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Exemplo de saída de rastreamento:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Falha nas verificações de referência

Os registros não foram ingeridos porque as verificações de referência falharam.

Possíveis motivos

  • Os registros referenciados não foram encontrados.
  • Os registros dos pais não foram encontrados.
  • O serviço de pesquisa está gerando erros 5xx.

Status do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão continua.

Solução

Verifique os registros de tarefas do Airflow para provide_manifest_integrity_task ou process_manifest_task.

Exemplo de Consulta do Kusto:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Como não há logs de erros específicos para tarefas de integridade referencial, verifique as instruções do log de depuração para ver se todos os registros externos foram buscados por meio do serviço de pesquisa.

Por exemplo, o exemplo de saída de rastreio a seguir mostra um registro consultado por meio do serviço de pesquisa para integridade referencial:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

A saída mostra os registros que foram recuperados e estavam no sistema. O objeto de manifesto relacionado que referenciava um registro seria descartado e não seria mais ingerido se você percebesse que alguns dos registros não estavam presentes.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

Os registros não foram ingeridos porque o manifesto contém marcas legais ou listas de controle de acesso (ACLs) inválidas.

Possíveis motivos

  • As ACLs estão incorretas.
  • As marcas legais estão incorretas.
  • O serviço de armazenamento está gerando erros 5xx.

Status do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho.

Solução

Verifique os registros de tarefas do Airflow para process_single_manifest_file_task ou process_manifest_task.

Exemplo de Consulta do Kusto:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Exemplo de saída de rastreamento:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

A saída indica registros que foram recuperados. Os registros de entidade de manifesto que correspondem aos registros de pesquisa ausentes são eliminados e não ingeridos.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Problemas conhecidos

  • Como não há logs de erros específicos para tarefas de integridade referencial, você deve procurar manualmente as instruções de log de depuração para ver se todos os registros externos foram recuperados por meio do serviço de pesquisa.

Próximas etapas

Avance para o tutorial a seguir e saiba como realizar uma ingestão de arquivos baseada em manifesto:

Referências