Solucione problemas de ingestão de manifesto usando registros de tarefas do Airflow
Esse artigo ajuda-o a resolver problemas de fluxo de trabalho com ingestão de manifesto no Azure Data Manager for Energy utilizando registos de tarefas do Airflow.
Tipos de fluxo de trabalho DAG de ingestão de manifesto
Existem dois tipos de fluxos de trabalho de gráfico acíclico direcionado (DAG) para ingestão de manifesto: manifesto único e fazer upload em lote.
Manifesto único
Um único arquivo de manifesto é usado para acionar o fluxo de trabalho de ingestão de manifesto.
Valor DagTaskName | Descrição |
---|---|
update_status_running_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados. |
check_payload_type |
Valida se o tipo de ingestão é em lote ou manifesto único. |
validate_manifest_schema_task |
Garante que todos os tipos de esquema mencionados no manifesto estejam presentes e que haja integridade de esquema referencial. Todos os valores inválidos são removidos do manifesto. |
provide_manifest_integrity_task |
Valida referências dentro do manifesto OSDU® R3 e remove entidades inválidas. Esse operador é responsável pela validação pai/filho. Todas as entidades órfãs são registradas e excluídas do manifesto validado. Quaisquer registros referenciados externos são pesquisados. Se nenhum for encontrado, a entidade manifesta será descartada. Todas as referências de chave substituta também são resolvidas. |
process_single_manifest_file_task |
Executa a ingestão das entidades manifestas finais obtidas na etapa anterior. Os registros de dados são ingeridos por meio do serviço de armazenamento. |
update_status_finished_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados. |
Carregamento em lote
Vários arquivos de manifesto fazem parte da mesma solicitação de serviço de fluxo de trabalho. A seção de manifesto na carga da solicitação é uma lista em vez de um dicionário de itens.
Valor DagTaskName | Descrição |
---|---|
update_status_running_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados. |
check_payload_type |
Valida se o tipo de ingestão é em lote ou manifesto único. |
batch_upload |
Divide a lista de manifestos em três lotes a serem processados em paralelo. (Nenhum log de tarefa é emitido.) |
process_manifest_task_(1 / 2 / 3) |
Divide a lista de manifestos em grupos de três e os processa. Todas as etapas executadas em validate_manifest_schema_task , provide_manifest_integrity_task , e process_single_manifest_file_task são condensadas e executadas sequencialmente nessas tarefas. |
update_status_finished_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados. |
Com base no tipo de carga útil (única ou em lote), a tarefa check_payload_type
escolhe a ramificação apropriada e ignora as tarefas na outra ramificação.
Pré-requisitos
Você deve ter logs de tarefas integrados do Airflow com o Azure Monitor. Veja Integrar logs do Airflow com o Azure Monitor.
As colunas a seguir são expostas nos logs de tarefas do Airflow para você depurar o problema:
Nome do Parâmetro | Descrição |
---|---|
RunID |
ID de execução exclusivo da execução do DAG acionada. |
CorrelationID |
ID de correlação exclusivo da execução do DAG (igual ao ID da execução). |
DagName |
Nome do fluxo de trabalho do DAG. Por exemplo, Osdu_ingest é o nome do fluxo de trabalho para ingestão de manifesto. |
DagTaskName |
Nome da tarefa para o fluxo de trabalho do DAG. Por exemplo, update_status_running_task é o nome da tarefa para ingestão de manifesto. |
Content |
Mensagens de log de erros (erros ou exceções) que o Airflow emite durante a execução da tarefa. |
LogTimeStamp |
Intervalo de tempo de execuções do DAG. |
LogLevel |
Nível do erro. Os valores são DEBUG , INFO , WARNING , e ERROR . Você pode ver a maioria das mensagens de exceção e de erro filtrando no nível ERROR . |
Falha na execução do DAG
A execução do fluxo de trabalho falhou em Update_status_running_task
ou Update_status_finished_task
e os registros de dados não foram ingeridos.
Possíveis motivos
- A chamada para a API de partição não foi autenticada porque o ID da partição de dados está incorreto.
- Um nome de chave no contexto de execução do corpo da solicitação está incorreto.
- O serviço de fluxo de trabalho não está em execução ou gera erros 5xx.
Status do fluxo de trabalho
O status do fluxo de trabalho é marcado como failed
.
Solução
Verifique os registros de tarefas do Airflow para update_status_running_task
ou update_status_finished_task
. Corrija a carga passando o ID da partição de dados ou o nome da chave correto.
Exemplo de Consulta do Kusto:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Exemplo de saída de rastreamento:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Falha na validação do esquema
Os registros não foram ingeridos porque a validação do esquema falhou.
Possíveis motivos
- O serviço de esquema está gerando erros de "Esquema não encontrado".
- O corpo do manifesto não está em conformidade com o tipo de esquema.
- As referências do esquema estão incorretas.
- O serviço de esquema está gerando erros 5xx.
Status do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão continua.
Solução
Verifique os registros de tarefas do Airflow para validate_manifest_schema_task
ou process_manifest_task
. Corrija a carga passando o ID da partição de dados ou o nome da chave correto.
Exemplo de Consulta do Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Exemplo de saída de rastreamento:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Falha nas verificações de referência
Os registros não foram ingeridos porque as verificações de referência falharam.
Possíveis motivos
- Os registros referenciados não foram encontrados.
- Os registros dos pais não foram encontrados.
- O serviço de pesquisa está gerando erros 5xx.
Status do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão continua.
Solução
Verifique os registros de tarefas do Airflow para provide_manifest_integrity_task
ou process_manifest_task
.
Exemplo de Consulta do Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Como não há logs de erros específicos para tarefas de integridade referencial, verifique as instruções do log de depuração para ver se todos os registros externos foram buscados por meio do serviço de pesquisa.
Por exemplo, o exemplo de saída de rastreio a seguir mostra um registro consultado por meio do serviço de pesquisa para integridade referencial:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
A saída mostra os registros que foram recuperados e estavam no sistema. O objeto de manifesto relacionado que referenciava um registro seria descartado e não seria mais ingerido se você percebesse que alguns dos registros não estavam presentes.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Marcas legais ou ACLs inválidas no manifesto
Os registros não foram ingeridos porque o manifesto contém marcas legais ou listas de controle de acesso (ACLs) inválidas.
Possíveis motivos
- As ACLs estão incorretas.
- As marcas legais estão incorretas.
- O serviço de armazenamento está gerando erros 5xx.
Status do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho.
Solução
Verifique os registros de tarefas do Airflow para process_single_manifest_file_task
ou process_manifest_task
.
Exemplo de Consulta do Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Exemplo de saída de rastreamento:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
A saída indica registros que foram recuperados. Os registros de entidade de manifesto que correspondem aos registros de pesquisa ausentes são eliminados e não ingeridos.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Problemas conhecidos
- Como não há logs de erros específicos para tarefas de integridade referencial, você deve procurar manualmente as instruções de log de depuração para ver se todos os registros externos foram recuperados por meio do serviço de pesquisa.
Próximas etapas
Avance para o tutorial a seguir e saiba como realizar uma ingestão de arquivos baseada em manifesto: