Share via


Résoudre les problèmes d’ingestion de manifeste à l’aide des journaux de tâches Airflow

Cet article vous aide à résoudre les problèmes de flux de travail liés à l’ingestion de manifeste dans Azure Data Manager for Energy à l’aide des journaux de tâches Airflow.

Types de flux de travail de DAG pour l’ingestion de manifeste

Il existe deux types de flux de travail de graphe orienté acyclique (DAG) pour l’ingestion de manifeste : manifeste unique et chargement par lots.

Manifeste unique

Un seul fichier manifeste est utilisé pour déclencher le flux de travail d’ingestion de manifeste.

Valeur NomTâcheJournalière Description
update_status_running_task Appelle le service de flux de travail et marque l’état du DAG comme running dans la base de données.
check_payload_type Vérifie si le type d’ingestion est basé sur un chargement par lots ou un manifeste unique.
validate_manifest_schema_task Garantit que tous les types de schémas mentionnés dans le manifeste sont présents et qu’une intégrité de schéma référentielle existe. Toutes les valeurs non valides sont supprimées du manifeste.
provide_manifest_intergrity_task Valide les références dans le manifeste OSDU® R3 et supprime les entités non valides. Cet opérateur est responsable de la validation parent/enfant. Toutes les entités orphelines sont enregistrées et exclues du manifeste validé. Recherche de tous les enregistrements référencés externes. Si la recherche ne donne rien, l’entité manifeste est supprimée. Toutes les références de clé de substitution sont également résolues.
process_single_manifest_file_task Effectue l’ingestion des entités de manifeste finales obtenues à l’étape précédente. Les enregistrements de données sont ingérés via le service de stockage.
update_status_finished_task Appelle le service de flux de travail et marque l’état du DAG comme finished ou failed dans la base de données.

Chargement par lots

Plusieurs fichiers manifeste font partie de la même demande de service de flux de travail. La section manifeste de la charge utile de la requête est une liste au lieu d’un dictionnaire d’éléments.

Valeur NomTâcheJournalière Description
update_status_running_task Appelle le service de flux de travail et marque l’état du DAG comme running dans la base de données.
check_payload_type Vérifie si le type d’ingestion est basé sur un chargement par lots ou un manifeste unique.
batch_upload Divise la liste des manifestes en trois lots à traiter en parallèle. (Aucun journal de tâches n’est émis.)
process_manifest_task_(1 / 2 / 3) Divise la liste des manifestes en groupes de trois et les traite. Toutes les étapes effectuées dans validate_manifest_schema_task, provide_manifest_intergrity_tasket process_single_manifest_file_task sont condensées et exécutées de manière séquentielle dans ces tâches.
update_status_finished_task Appelle le service de flux de travail et marque l’état du DAG comme finished ou failed dans la base de données.

En fonction du type de charge utile (unique ou batch), la tâche check_payload_typechoisit la branche appropriée et ignore les tâches de l’autre branche.

Prérequis

Vous devez avoir intégré les journaux de tâches Airflow à Azure Monitor. Consulter Intégrer les journaux Airflow à Azure Monitor.

Les colonnes suivantes sont exposées dans les journaux de tâches Airflow pour vous permettre de déboguer le problème :

Nom du paramètre Description
RunID ID d’exécution unique de l’exécution du DAG déclenché.
CorrelationID ID de corrélation unique de l’exécution du DAG (identique à l’ID d’exécution).
DagName Nom du flux de travail du DAG. Par exemple, Osdu_ingest est le nom du flux de travail pour l’ingestion de manifeste.
DagTaskName Nom de la tâche pour le flux de travail du DAG. Par exemple, update_status_running_task est le nom de la tâche pour l’ingestion de manifeste.
Content Messages de journal des erreurs (erreurs ou exceptions) émis par Airflow pendant l’exécution de la tâche.
LogTimeStamp Intervalle de temps des exécutions de DAG.
LogLevel Niveau de l’erreur. Les valeurs sont DEBUG, INFO, WARNING et ERROR. Vous pouvez voir la plupart des messages d’exception et d’erreur en filtrant au niveau ERROR.

Échec de l’exécution du DAG

L’exécution du flux de travail a échoué à l’étape Update_status_running_task ou Update_status_finished_task, et les enregistrements de données n’ont pas été ingérés.

Causes possibles

  • L’appel à l’API de partition n’a pas été authentifié, car l’ID de partition de données est incorrect.
  • Un nom de clé dans le contexte d’exécution du corps de la requête est incorrect.
  • Le service de flux de travail n’est pas en cours d’exécution ou génère des erreurs 5xx.

Statut du flux de travail

L’état du flux de travail est marqué comme failed.

Solution

Vérifiez les journaux de tâches Airflow pour update_status_running_task ou update_status_finished_task. Corrigez la charge utile en transmettant l’ID de partition de données ou le nom de clé correct.

Exemple de requête Kusto :

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Échantillon de sortie de suivi :

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Échec de la validation du schéma

Les enregistrements n’ont pas été ingérés, car la validation du schéma a échoué.

Causes possibles

  • Le service de schéma lève des erreurs « Schéma introuvable ».
  • Le corps du manifeste n’est pas conforme au type de schéma.
  • Les références de schéma sont incorrectes.
  • Le service de schéma génère des erreurs 5xx.

Statut du flux de travail

L’état du flux de travail est marqué comme finished. Vous n’observez pas de défaillance dans l’état du flux de travail, car les entités non valides sont ignorées et l’ingestion se poursuit.

Solution

Vérifiez les journaux de tâches Airflow pour validate_manifest_schema_task ou process_manifest_task. Corrigez la charge utile en transmettant l’ID de partition de données ou le nom de clé correct.

Exemple de requête Kusto :

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Échantillon de sortie de suivi :

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Échec des vérifications de référence

Les enregistrements n’ont pas été ingérés, car les vérifications de référence ont échoué.

Causes possibles

  • Les enregistrements référencés n’ont pas été trouvés.
  • Les enregistrements parents n’ont pas été trouvés.
  • Le service de recherche génère des erreurs 5xx.

Statut du flux de travail

L’état du flux de travail est marqué comme finished. Vous n’observez pas de défaillance dans l’état du flux de travail, car les entités non valides sont ignorées et l’ingestion se poursuit.

Solution

Vérifiez les journaux de tâches Airflow pour provide_manifest_integrity_task ou process_manifest_task.

Exemple de requête Kusto :

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Étant donné qu’il n’existe aucun journal des erreurs spécifiquement pour les tâches d’intégrité référentielle, vérifiez les instructions de journal de débogage pour voir si tous les enregistrements externes ont été récupérés via le service de recherche.

Par exemple, l’échantillon de sortie de suivi suivant montre un enregistrement demandé via le service de recherche pour l’intégrité référentielle :

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

La sortie indique les enregistrements qui ont été récupérés et qui se trouvaient dans le système. L’objet manifeste associé qui a référencé un enregistrement est supprimé et n’est plus ingéré si vous avez remarqué que certains enregistrements n’étaient pas présents.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

Les enregistrements n’ont pas été ingérés, car le manifeste contient des balises juridiques ou des listes de contrôle d’accès (ACL) non valides.

Causes possibles

  • Les listes de contrôle d’accès sont incorrectes.
  • Les balises juridiques sont incorrectes.
  • Le service de stockage génère des erreurs 5xx.

Statut du flux de travail

L’état du flux de travail est marqué comme finished. Vous ne remarquez pas de défaillance dans l’état du flux de travail.

Solution

Vérifiez les journaux de tâches Airflow pour process_single_manifest_file_task ou process_manifest_task.

Exemple de requête Kusto :

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Échantillon de sortie de suivi :

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

La sortie indique les enregistrements qui ont été récupérés. Les enregistrements d’entité manifeste qui correspondent aux enregistrements manquants dans la recherche sont supprimés et non ingérés.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Problèmes connus

  • Étant donné qu’il n’existe aucun journal d’erreurs spécifique pour les tâches d’intégrité référentielle, vous devez rechercher manuellement les instructions de journal de débogage pour voir si tous les enregistrements externes ont été récupérés via le service de recherche.

Étapes suivantes

Passez au tutoriel suivant et découvrez comment effectuer une ingestion de fichier basée sur un manifeste :

Références