Beheben von Problemen bei der Manifesterfassung mithilfe von Airflow-Aufgabenprotokollen
Dieser Artikel hilft Ihnen bei der Behandlung von Workflowproblemen bei der Manifesterfassung in Azure Data Manager for Energy mithilfe von Airflow-Aufgabenprotokollen.
DAG-Workflowtypen bei der Manifesterfassung
Es gibt zwei Arten von DAG-Workflows (Directed Acyclic Graph, gerichteter azyklischer Graph) für die Manifesterfassung: einzelnes Manifest und Batchupload.
Einzelnes Manifest
Eine einzelne Manifestdatei wird verwendet, um den Manifesterfassungsworkflow auszulösen.
DagTaskName-Wert | Beschreibung |
---|---|
update_status_running_task |
Ruft den Workflowdienst auf und kennzeichnet den DAG-Status in der Datenbank als running . |
check_payload_type |
Überprüft, ob als Erfassungstyp die Batcherfassung oder die Erfassung mit einem einzelnen Manifest verwendet wird. |
validate_manifest_schema_task |
Stellt sicher, dass alle im Manifest erwähnten Schematypen vorhanden sind und referenzielle Schemaintegrität besteht. Alle ungültigen Werte werden aus dem Manifest entfernt. |
provide_manifest_integrity_task |
Überprüft Verweise innerhalb des OSDU® R3-Manifests und entfernt ungültige Entitäten. Dieser Operator ist für die Überprüfung übergeordneter/untergeordneter Elemente verantwortlich. Alle verwaisten Entitäten werden protokolliert und aus dem überprüften Manifest ausgeschlossen. Alle externen Datensätze, auf die verwiesen wird, werden durchsucht. Wenn keine gefunden werden, wird die Manifestentität gelöscht. Alle Ersatzschlüsselverweise werden ebenfalls aufgelöst. |
process_single_manifest_file_task |
Führt die Erfassung der endgültigen Manifestentitäten aus, die im vorherigen Schritt abgerufen wurden. Datensätze werden über den Speicherdienst erfasst. |
update_status_finished_task |
Ruft den Workflowdienst auf und kennzeichnet den DAG-Status in der Datenbank als finished oder failed . |
Batchupload
Mehrere Manifestdateien sind Teil derselben Workflowdienstanforderung. Der Manifestabschnitt in der Anforderungsnutzlast ist eine Liste anstelle eines Wörterbuchs mit Elementen.
DagTaskName-Wert | Beschreibung |
---|---|
update_status_running_task |
Ruft den Workflowdienst auf und kennzeichnet den DAG-Status in der Datenbank als running . |
check_payload_type |
Überprüft, ob als Erfassungstyp die Batcherfassung oder die Erfassung mit einem einzelnen Manifest verwendet wird. |
batch_upload |
Teilt die Liste der Manifeste in drei Batches auf, die parallel verarbeitet werden sollen. (Es werden keine Aufgabenprotokolle ausgegeben.) |
process_manifest_task_(1 / 2 / 3) |
Teilt die Liste der Manifeste in Dreiergruppen auf und verarbeitet sie. Alle in validate_manifest_schema_task , provide_manifest_integrity_task und process_single_manifest_file_task ausgeführten Schritte werden in diesen Aufgaben komprimiert und nacheinander ausgeführt. |
update_status_finished_task |
Ruft den Workflowdienst auf und kennzeichnet den DAG-Status in der Datenbank als finished oder failed . |
Basierend auf dem Nutzlasttyp (Einzel- oder Batchverarbeitung) wählt die Aufgabe check_payload_type
den entsprechenden Branch aus und überspringt die Aufgaben im anderen Branch.
Voraussetzungen
Sie sollten Airflow-Aufgabenprotokolle in Azure Monitor integriert haben. Weitere Informationen finden Sie unter Integrieren von Airflow-Protokollen in Azure Monitor.
Die folgenden Spalten werden in den Airflow-Aufgabenprotokollen verfügbar gemacht, damit Sie das Problem debuggen können:
Parametername | Beschreibung |
---|---|
RunID |
Eindeutige Ausführungs-ID der ausgelösten DAG-Ausführung |
CorrelationID |
Eindeutige Korrelations-ID der DAG-Ausführung (identisch mit der Ausführungs-ID) |
DagName |
DAG-Workflowname. Osdu_ingest ist beispielsweise der Workflowname für die Manifesterfassung. |
DagTaskName |
Aufgabenname für den DAG-Workflow. update_status_running_task ist beispielsweise der Aufgabenname für die Manifesterfassung. |
Content |
Fehlerprotokollmeldungen (Fehler oder Ausnahmen), die Airflow während der Aufgabenausführung ausgibt |
LogTimeStamp |
Das Zeitintervall von DAG-Ausführungen |
LogLevel |
Fehlerstufe. Gültige Werte sind DEBUG , INFO , WARNING und ERROR . Die meisten Ausnahme- und Fehlermeldungen werden angezeigt, wenn Sie nach der Stufe ERROR filtern. |
Fehlerhafte DAG-Ausführung
Bei der Ausführung des Workflows ist in Update_status_running_task
oder Update_status_finished_task
ein Fehler aufgetreten, und die Datensätze wurden nicht erfasst.
Mögliche Ursachen
- Der Aufruf der Partitions-API wurde nicht authentifiziert, da die Datenpartitions-ID falsch ist.
- Ein Schlüsselname im Ausführungskontext des Anforderungstexts ist falsch.
- Der Workflowdienst wird nicht ausgeführt oder löst Fehler vom Typ „5xx“ aus.
Workflowstatus
Der Workflowstatus ist als failed
gekennzeichnet.
Lösung
Überprüfen Sie die Airflow-Aufgabenprotokolle auf update_status_running_task
oder update_status_finished_task
. Korrigieren Sie die Nutzlast, indem Sie die richtige Datenpartitions-ID oder den richtigen Schlüsselnamen übergeben.
Kusto-Beispielabfrage:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Beispiel für eine Ablaufverfolgungsausgabe:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Fehlerhafte Schemavalidierung
Datensätze wurden nicht aufgenommen, da bei der Schemaüberprüfung ein Fehler aufgetreten ist.
Mögliche Ursachen
- Der Schemadienst löst Fehler vom Typ „Schema nicht gefunden“ aus.
- Der Manifesttext entspricht nicht dem Schematyp.
- Die Schemaverweise sind falsch.
- Der Schemadienst löst Fehler vom Typ „5xx“ aus.
Workflowstatus
Der Workflowstatus ist als finished
gekennzeichnet. Sie sehen keinen Fehler im Workflowstatus, da die ungültigen Entitäten übersprungen werden und die Aufnahme fortgesetzt wird.
Lösung
Überprüfen Sie die Airflow-Aufgabenprotokolle auf validate_manifest_schema_task
oder process_manifest_task
. Korrigieren Sie die Nutzlast, indem Sie die richtige Datenpartitions-ID oder den richtigen Schlüsselnamen übergeben.
Kusto-Beispielabfrage:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Beispiel für eine Ablaufverfolgungsausgabe:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Fehlerhafte Verweisüberprüfungen
Datensätze wurden nicht erfasst, da bei den Verweisüberprüfungen ein Fehler aufgetreten ist.
Mögliche Ursachen
- Referenzierte Datensätze wurden nicht gefunden.
- Übergeordnete Datensätze wurden nicht gefunden.
- Der Suchdienst löst Fehler vom Typ „5xx“ aus.
Workflowstatus
Der Workflowstatus ist als finished
gekennzeichnet. Sie sehen keinen Fehler im Workflowstatus, da die ungültigen Entitäten übersprungen werden und die Aufnahme fortgesetzt wird.
Lösung
Überprüfen Sie die Airflow-Aufgabenprotokolle auf provide_manifest_integrity_task
oder process_manifest_task
.
Kusto-Beispielabfrage:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Da keine Fehlerprotokolle speziell für Aufgaben zur referenziellen Integrität vorhanden sind, überprüfen Sie die Debugprotokollanweisungen, um festzustellen, ob alle externen Datensätze über den Suchdienst abgerufen wurden.
Das folgende Beispiel einer Ablaufverfolgungsausgabe zeigt beispielsweise einen über den Suchdienst abgefragten Datensatz für die referenzielle Integrität:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
Die Ausgabe zeigt die Datensätze, die abgerufen wurden und sich im System befanden. Das zugehörige Manifestobjekt, das auf einen Datensatz verwies, wird gelöscht und nicht mehr erfasst, wenn Sie feststellen, dass einige der Datensätze nicht vorhanden waren.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Ungültige rechtliche Tags oder ACLs im Manifest
Datensätze wurden nicht erfasst, da das Manifest ungültige rechtliche Tags oder Zugriffssteuerungslisten (Access Control Lists, ACLs) enthält.
Mögliche Ursachen
- ACLs sind nicht korrekt.
- Rechtliche Tags sind nicht korrekt.
- Der Speicherdienst löst Fehler vom Typ „5xx“ aus.
Workflowstatus
Der Workflowstatus ist als finished
gekennzeichnet. Sie sehen keinen Fehler im Workflowstatus.
Lösung
Überprüfen Sie die Airflow-Aufgabenprotokolle auf process_single_manifest_file_task
oder process_manifest_task
.
Kusto-Beispielabfrage:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Beispiel für eine Ablaufverfolgungsausgabe:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
Die Ausgabe gibt Datensätze an, die abgerufen wurden. Manifestentitätseinträge, die fehlenden Suchdatensätze entsprechen, werden gelöscht und nicht erfasst.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Bekannte Probleme
- Da keine speziellen Fehlerprotokolle für Aufgaben zur referenziellen Integrität vorhanden sind, müssen Sie manuell nach den Debugprotokollanweisungen suchen, um zu ermitteln, ob alle externen Datensätze über den Suchdienst abgerufen wurden.
Nächste Schritte
Fahren Sie mit dem folgenden Tutorial fort, und erfahren Sie, wie Sie eine manifestbasierte Dateierfassung durchführen: