Share via


Risolvere i problemi di inserimento del manifesto usando i log attività Airflow

Questo articolo illustra come risolvere i problemi del flusso di lavoro con l'inserimento di manifesti in Azure Data Manager per l'energia usando i log attività airflow.

Tipi di flusso di lavoro DAG per l'inserimento di manifesti

Esistono due tipi di flussi di lavoro aciclici diretti per l'inserimento di manifesti: singolo manifesto e caricamento batch.

Singolo manifesto

Un singolo file manifesto viene usato per attivare il flusso di lavoro di inserimento del manifesto.

Valore dagTaskName Descrizione
update_status_running_task Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come running nel database.
check_payload_type Convalida se il tipo di inserimento è batch o singolo manifesto.
validate_manifest_schema_task Assicura che tutti i tipi di schema indicati nel manifesto siano presenti e che sia presente l'integrità dello schema referenziale. Tutti i valori non validi vengono rimossi dal manifesto.
provide_manifest_intergrity_task Convalida i riferimenti all'interno del manifesto OSDU® R3 e rimuove entità non valide. Questo operatore è responsabile della convalida padre/figlio. Tutte le entità orfane vengono registrate ed escluse dal manifesto convalidato. Viene eseguita una ricerca in tutti i record a cui viene fatto riferimento esterno. Se non viene trovato alcuno, l'entità manifesto viene eliminata. Vengono risolti anche tutti i riferimenti di chiave surrogata.
process_single_manifest_file_task Esegue l'inserimento delle entità manifesto finali ottenute dal passaggio precedente. I record di dati vengono inseriti tramite il servizio di archiviazione.
update_status_finished_task Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come finished o failed nel database.

Caricamento batch

Più file manifesto fanno parte della stessa richiesta di servizio del flusso di lavoro. La sezione manifesto nel payload della richiesta è un elenco anziché un dizionario di elementi.

Valore dagTaskName Descrizione
update_status_running_task Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come running nel database.
check_payload_type Convalida se il tipo di inserimento è batch o singolo manifesto.
batch_upload Divide l'elenco di manifesti in tre batch da elaborare in parallelo. Non vengono generati log attività.
process_manifest_task_(1 / 2 / 3) Divide l'elenco di manifesti in gruppi di tre e li elabora. Tutti i passaggi eseguiti in validate_manifest_schema_task, provide_manifest_intergrity_taske process_single_manifest_file_task vengono condensati ed eseguiti in sequenza in queste attività.
update_status_finished_task Chiama il servizio flusso di lavoro e contrassegna lo stato del dag come finished o failed nel database.

In base al tipo di payload (singolo o batch), l'attività check_payload_type sceglie il ramo appropriato e ignora le attività nell'altro ramo.

Prerequisiti

È necessario avere integrato i log attività airflow con Monitoraggio di Azure. Vedere Integrare i log airflow con Monitoraggio di Azure.

Le colonne seguenti vengono esposte nei log attività Airflow per eseguire il debug del problema:

Nome parametro Descrizione
RunID ID di esecuzione univoco dell'esecuzione daG attivata.
CorrelationID ID di correlazione univoco dell'esecuzione del gruppo di sicurezza di database (uguale all'ID esecuzione).
DagName Nome del flusso di lavoro DAG. Ad esempio, Osdu_ingest è il nome del flusso di lavoro per l'inserimento del manifesto.
DagTaskName Nome dell'attività per il flusso di lavoro DAG. Ad esempio, update_status_running_task è il nome dell'attività per l'inserimento del manifesto.
Content Messaggi di log degli errori (errori o eccezioni) generati da Airflow durante l'esecuzione dell'attività.
LogTimeStamp Intervallo di tempo delle esecuzioni di DAG.
LogLevel Livello dell'errore. I valori sono DEBUG, WARNINGINFO, e ERROR. È possibile visualizzare la maggior parte dei messaggi di eccezione e di errore filtrando a ERROR livello.

Esecuzione del DAG non riuscita

L'esecuzione del flusso di lavoro non è riuscita in Update_status_running_task o Update_status_finished_taske i record di dati non sono stati inseriti.

Possibili cause

  • La chiamata all'API di partizione non è stata autenticata perché l'ID partizione dati non è corretto.
  • Un nome di chiave nel contesto di esecuzione del corpo della richiesta non è corretto.
  • Il servizio flusso di lavoro non è in esecuzione o genera errori 5xx.

Stato flusso di lavoro

Lo stato del flusso di lavoro è contrassegnato come failed.

Soluzione

Controllare i log attività Airflow per update_status_running_task o update_status_finished_task. Correggere il payload passando l'ID della partizione di dati o il nome della chiave corretti.

Query Kusto di esempio:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Output di traccia di esempio:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Convalida dello schema non riuscita

I record non sono stati inseriti perché la convalida dello schema non è riuscita.

Possibili cause

  • Il servizio schema genera errori "Schema non trovato".
  • Il corpo del manifesto non è conforme al tipo di schema.
  • I riferimenti allo schema non sono corretti.
  • Il servizio schema genera errori 5xx.

Stato flusso di lavoro

Lo stato del flusso di lavoro è contrassegnato come finished. Non si osserva un errore nello stato del flusso di lavoro perché le entità non valide vengono ignorate e l'inserimento continua.

Soluzione

Controllare i log attività Airflow per validate_manifest_schema_task o process_manifest_task. Correggere il payload passando l'ID della partizione di dati o il nome della chiave corretti.

Query Kusto di esempio:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Output di traccia di esempio:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Controlli di riferimento non riusciti

I record non sono stati inseriti perché i controlli di riferimento non sono riusciti.

Possibili cause

  • I record a cui si fa riferimento non sono stati trovati.
  • I record padre non sono stati trovati.
  • Il servizio di ricerca genera errori 5xx.

Stato flusso di lavoro

Lo stato del flusso di lavoro è contrassegnato come finished. Non si osserva un errore nello stato del flusso di lavoro perché le entità non valide vengono ignorate e l'inserimento continua.

Soluzione

Controllare i log attività Airflow per provide_manifest_integrity_task o process_manifest_task.

Query Kusto di esempio:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Poiché non sono presenti log degli errori specifici per le attività di integrità referenziale, controllare le istruzioni del log di debug per verificare se tutti i record esterni sono stati recuperati tramite il servizio di ricerca.

Ad esempio, l'output di traccia di esempio seguente mostra un record sottoposto a query tramite il servizio di ricerca per l'integrità referenziale:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

L'output mostra i record recuperati e presenti nel sistema. L'oggetto manifesto correlato a cui si fa riferimento a un record verrebbe eliminato e non verrà più inserito se si notava che alcuni dei record non erano presenti.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

I record non sono stati inseriti perché il manifesto contiene tag legali non validi o elenchi di controllo di accesso (ACL).

Possibili cause

  • Gli elenchi di controllo di accesso non sono corretti.
  • I tag legali non sono corretti.
  • Il servizio di archiviazione genera errori 5xx.

Stato flusso di lavoro

Lo stato del flusso di lavoro è contrassegnato come finished. Non si osserva un errore nello stato del flusso di lavoro.

Soluzione

Controllare i log attività Airflow per process_single_manifest_file_task o process_manifest_task.

Query Kusto di esempio:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Output di traccia di esempio:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

L'output indica i record recuperati. I record di entità manifesto che corrispondono ai record di ricerca mancanti vengono eliminati e non inseriti.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Problemi noti

  • Poiché non sono presenti log degli errori specifici per le attività di integrità referenziale, è necessario cercare manualmente le istruzioni del log di debug per verificare se tutti i record esterni sono stati recuperati tramite il servizio di ricerca.

Passaggi successivi

Passare all'esercitazione seguente e imparare a eseguire un inserimento di file basato su manifesto:

Riferimenti