Risolvere i problemi di inserimento del manifesto usando i log attività Airflow
Questo articolo illustra come risolvere i problemi del flusso di lavoro con l'inserimento di manifesti in Azure Data Manager per l'energia usando i log attività airflow.
Tipi di flusso di lavoro DAG per l'inserimento di manifesti
Esistono due tipi di flussi di lavoro aciclici diretti per l'inserimento di manifesti: singolo manifesto e caricamento batch.
Singolo manifesto
Un singolo file manifesto viene usato per attivare il flusso di lavoro di inserimento del manifesto.
Valore dagTaskName | Descrizione |
---|---|
update_status_running_task |
Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come running nel database. |
check_payload_type |
Convalida se il tipo di inserimento è batch o singolo manifesto. |
validate_manifest_schema_task |
Assicura che tutti i tipi di schema indicati nel manifesto siano presenti e che sia presente l'integrità dello schema referenziale. Tutti i valori non validi vengono rimossi dal manifesto. |
provide_manifest_intergrity_task |
Convalida i riferimenti all'interno del manifesto OSDU® R3 e rimuove entità non valide. Questo operatore è responsabile della convalida padre/figlio. Tutte le entità orfane vengono registrate ed escluse dal manifesto convalidato. Viene eseguita una ricerca in tutti i record a cui viene fatto riferimento esterno. Se non viene trovato alcuno, l'entità manifesto viene eliminata. Vengono risolti anche tutti i riferimenti di chiave surrogata. |
process_single_manifest_file_task |
Esegue l'inserimento delle entità manifesto finali ottenute dal passaggio precedente. I record di dati vengono inseriti tramite il servizio di archiviazione. |
update_status_finished_task |
Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come finished o failed nel database. |
Caricamento batch
Più file manifesto fanno parte della stessa richiesta di servizio del flusso di lavoro. La sezione manifesto nel payload della richiesta è un elenco anziché un dizionario di elementi.
Valore dagTaskName | Descrizione |
---|---|
update_status_running_task |
Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come running nel database. |
check_payload_type |
Convalida se il tipo di inserimento è batch o singolo manifesto. |
batch_upload |
Divide l'elenco di manifesti in tre batch da elaborare in parallelo. Non vengono generati log attività. |
process_manifest_task_(1 / 2 / 3) |
Divide l'elenco di manifesti in gruppi di tre e li elabora. Tutti i passaggi eseguiti in validate_manifest_schema_task , provide_manifest_intergrity_task e process_single_manifest_file_task vengono condensati ed eseguiti in sequenza in queste attività. |
update_status_finished_task |
Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come finished o failed nel database. |
In base al tipo di payload (singolo o batch), l'attività check_payload_type
sceglie il ramo appropriato e ignora le attività nell'altro ramo.
Prerequisiti
È necessario avere integrato i log attività airflow con Monitoraggio di Azure. Vedere Integrare i log airflow con Monitoraggio di Azure.
Le colonne seguenti vengono esposte nei log attività Airflow per eseguire il debug del problema:
Nome parametro | Descrizione |
---|---|
RunID |
ID di esecuzione univoco dell'esecuzione daG attivata. |
CorrelationID |
ID di correlazione univoco dell'esecuzione del gruppo di sicurezza di database (uguale all'ID esecuzione). |
DagName |
Nome del flusso di lavoro DAG. Ad esempio, Osdu_ingest è il nome del flusso di lavoro per l'inserimento del manifesto. |
DagTaskName |
Nome dell'attività per il flusso di lavoro DAG. Ad esempio, update_status_running_task è il nome dell'attività per l'inserimento del manifesto. |
Content |
Messaggi di log degli errori (errori o eccezioni) generati da Airflow durante l'esecuzione dell'attività. |
LogTimeStamp |
Intervallo di tempo delle esecuzioni di DAG. |
LogLevel |
Livello dell'errore. I valori sono DEBUG , WARNING INFO , e ERROR . È possibile visualizzare la maggior parte dei messaggi di eccezione e di errore filtrando a ERROR livello. |
Esecuzione del DAG non riuscita
L'esecuzione del flusso di lavoro non è riuscita in Update_status_running_task
o Update_status_finished_task
e i record di dati non sono stati inseriti.
Possibili cause
- La chiamata all'API di partizione non è stata autenticata perché l'ID partizione dati non è corretto.
- Un nome di chiave nel contesto di esecuzione del corpo della richiesta non è corretto.
- Il servizio flusso di lavoro non è in esecuzione o genera errori 5xx.
Stato flusso di lavoro
Lo stato del flusso di lavoro è contrassegnato come failed
.
Soluzione
Controllare i log attività Airflow per update_status_running_task
o update_status_finished_task
. Correggere il payload passando l'ID della partizione di dati o il nome della chiave corretti.
Query Kusto di esempio:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Output di traccia di esempio:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Convalida dello schema non riuscita
I record non sono stati inseriti perché la convalida dello schema non è riuscita.
Possibili cause
- Il servizio schema genera errori "Schema non trovato".
- Il corpo del manifesto non è conforme al tipo di schema.
- I riferimenti allo schema non sono corretti.
- Il servizio schema genera errori 5xx.
Stato flusso di lavoro
Lo stato del flusso di lavoro è contrassegnato come finished
. Non si osserva un errore nello stato del flusso di lavoro perché le entità non valide vengono ignorate e l'inserimento continua.
Soluzione
Controllare i log attività Airflow per validate_manifest_schema_task
o process_manifest_task
. Correggere il payload passando l'ID della partizione di dati o il nome della chiave corretti.
Query Kusto di esempio:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Output di traccia di esempio:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Controlli di riferimento non riusciti
I record non sono stati inseriti perché i controlli di riferimento non sono riusciti.
Possibili cause
- I record a cui si fa riferimento non sono stati trovati.
- I record padre non sono stati trovati.
- Il servizio di ricerca genera errori 5xx.
Stato flusso di lavoro
Lo stato del flusso di lavoro è contrassegnato come finished
. Non si osserva un errore nello stato del flusso di lavoro perché le entità non valide vengono ignorate e l'inserimento continua.
Soluzione
Controllare i log attività Airflow per provide_manifest_integrity_task
o process_manifest_task
.
Query Kusto di esempio:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Poiché non sono presenti log degli errori specifici per le attività di integrità referenziale, controllare le istruzioni del log di debug per verificare se tutti i record esterni sono stati recuperati tramite il servizio di ricerca.
Ad esempio, l'output di traccia di esempio seguente mostra un record sottoposto a query tramite il servizio di ricerca per l'integrità referenziale:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
L'output mostra i record recuperati e presenti nel sistema. L'oggetto manifesto correlato a cui si fa riferimento a un record verrebbe eliminato e non verrà più inserito se si notava che alcuni dei record non erano presenti.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Tag legali o ACL non validi nel manifesto
I record non sono stati inseriti perché il manifesto contiene tag legali non validi o elenchi di controllo di accesso (ACL).
Possibili cause
- Gli elenchi di controllo di accesso non sono corretti.
- I tag legali non sono corretti.
- Il servizio di archiviazione genera errori 5xx.
Stato flusso di lavoro
Lo stato del flusso di lavoro è contrassegnato come finished
. Non si osserva un errore nello stato del flusso di lavoro.
Soluzione
Controllare i log attività Airflow per process_single_manifest_file_task
o process_manifest_task
.
Query Kusto di esempio:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Output di traccia di esempio:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
L'output indica i record recuperati. I record di entità manifesto che corrispondono ai record di ricerca mancanti vengono eliminati e non inseriti.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Problemi noti
- Poiché non sono presenti log degli errori specifici per le attività di integrità referenziale, è necessario cercare manualmente le istruzioni del log di debug per verificare se tutti i record esterni sono stati recuperati tramite il servizio di ricerca.
Passaggi successivi
Passare all'esercitazione seguente e imparare a eseguire un inserimento di file basato su manifesto: