Résoudre les problèmes d’ingestion de manifeste à l’aide des journaux de tâches Airflow
Cet article vous aide à résoudre les problèmes de flux de travail liés à l’ingestion de manifeste dans Azure Data Manager for Energy à l’aide des journaux de tâches Airflow.
Types de flux de travail de DAG pour l’ingestion de manifeste
Il existe deux types de flux de travail de graphe orienté acyclique (DAG) pour l’ingestion de manifeste : manifeste unique et chargement par lots.
Manifeste unique
Un seul fichier manifeste est utilisé pour déclencher le flux de travail d’ingestion de manifeste.
Valeur NomTâcheJournalière | Description |
---|---|
update_status_running_task |
Appelle le service de flux de travail et marque l’état du DAG comme running dans la base de données. |
check_payload_type |
Vérifie si le type d’ingestion est basé sur un chargement par lots ou un manifeste unique. |
validate_manifest_schema_task |
Garantit que tous les types de schémas mentionnés dans le manifeste sont présents et qu’une intégrité de schéma référentielle existe. Toutes les valeurs non valides sont supprimées du manifeste. |
provide_manifest_intergrity_task |
Valide les références dans le manifeste OSDU® R3 et supprime les entités non valides. Cet opérateur est responsable de la validation parent/enfant. Toutes les entités orphelines sont enregistrées et exclues du manifeste validé. Recherche de tous les enregistrements référencés externes. Si la recherche ne donne rien, l’entité manifeste est supprimée. Toutes les références de clé de substitution sont également résolues. |
process_single_manifest_file_task |
Effectue l’ingestion des entités de manifeste finales obtenues à l’étape précédente. Les enregistrements de données sont ingérés via le service de stockage. |
update_status_finished_task |
Appelle le service de flux de travail et marque l’état du DAG comme finished ou failed dans la base de données. |
Chargement par lots
Plusieurs fichiers manifeste font partie de la même demande de service de flux de travail. La section manifeste de la charge utile de la requête est une liste au lieu d’un dictionnaire d’éléments.
Valeur NomTâcheJournalière | Description |
---|---|
update_status_running_task |
Appelle le service de flux de travail et marque l’état du DAG comme running dans la base de données. |
check_payload_type |
Vérifie si le type d’ingestion est basé sur un chargement par lots ou un manifeste unique. |
batch_upload |
Divise la liste des manifestes en trois lots à traiter en parallèle. (Aucun journal de tâches n’est émis.) |
process_manifest_task_(1 / 2 / 3) |
Divise la liste des manifestes en groupes de trois et les traite. Toutes les étapes effectuées dans validate_manifest_schema_task , provide_manifest_intergrity_task et process_single_manifest_file_task sont condensées et exécutées de manière séquentielle dans ces tâches. |
update_status_finished_task |
Appelle le service de flux de travail et marque l’état du DAG comme finished ou failed dans la base de données. |
En fonction du type de charge utile (unique ou batch), la tâche check_payload_type
choisit la branche appropriée et ignore les tâches de l’autre branche.
Prérequis
Vous devez avoir intégré les journaux de tâches Airflow à Azure Monitor. Consulter Intégrer les journaux Airflow à Azure Monitor.
Les colonnes suivantes sont exposées dans les journaux de tâches Airflow pour vous permettre de déboguer le problème :
Nom du paramètre | Description |
---|---|
RunID |
ID d’exécution unique de l’exécution du DAG déclenché. |
CorrelationID |
ID de corrélation unique de l’exécution du DAG (identique à l’ID d’exécution). |
DagName |
Nom du flux de travail du DAG. Par exemple, Osdu_ingest est le nom du flux de travail pour l’ingestion de manifeste. |
DagTaskName |
Nom de la tâche pour le flux de travail du DAG. Par exemple, update_status_running_task est le nom de la tâche pour l’ingestion de manifeste. |
Content |
Messages de journal des erreurs (erreurs ou exceptions) émis par Airflow pendant l’exécution de la tâche. |
LogTimeStamp |
Intervalle de temps des exécutions de DAG. |
LogLevel |
Niveau de l’erreur. Les valeurs sont DEBUG , INFO , WARNING et ERROR . Vous pouvez voir la plupart des messages d’exception et d’erreur en filtrant au niveau ERROR . |
Échec de l’exécution du DAG
L’exécution du flux de travail a échoué à l’étape Update_status_running_task
ou Update_status_finished_task
, et les enregistrements de données n’ont pas été ingérés.
Causes possibles
- L’appel à l’API de partition n’a pas été authentifié, car l’ID de partition de données est incorrect.
- Un nom de clé dans le contexte d’exécution du corps de la requête est incorrect.
- Le service de flux de travail n’est pas en cours d’exécution ou génère des erreurs 5xx.
Statut du flux de travail
L’état du flux de travail est marqué comme failed
.
Solution
Vérifiez les journaux de tâches Airflow pour update_status_running_task
ou update_status_finished_task
. Corrigez la charge utile en transmettant l’ID de partition de données ou le nom de clé correct.
Exemple de requête Kusto :
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Échantillon de sortie de suivi :
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Échec de la validation du schéma
Les enregistrements n’ont pas été ingérés, car la validation du schéma a échoué.
Causes possibles
- Le service de schéma lève des erreurs « Schéma introuvable ».
- Le corps du manifeste n’est pas conforme au type de schéma.
- Les références de schéma sont incorrectes.
- Le service de schéma génère des erreurs 5xx.
Statut du flux de travail
L’état du flux de travail est marqué comme finished
. Vous n’observez pas de défaillance dans l’état du flux de travail, car les entités non valides sont ignorées et l’ingestion se poursuit.
Solution
Vérifiez les journaux de tâches Airflow pour validate_manifest_schema_task
ou process_manifest_task
. Corrigez la charge utile en transmettant l’ID de partition de données ou le nom de clé correct.
Exemple de requête Kusto :
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Échantillon de sortie de suivi :
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Échec des vérifications de référence
Les enregistrements n’ont pas été ingérés, car les vérifications de référence ont échoué.
Causes possibles
- Les enregistrements référencés n’ont pas été trouvés.
- Les enregistrements parents n’ont pas été trouvés.
- Le service de recherche génère des erreurs 5xx.
Statut du flux de travail
L’état du flux de travail est marqué comme finished
. Vous n’observez pas de défaillance dans l’état du flux de travail, car les entités non valides sont ignorées et l’ingestion se poursuit.
Solution
Vérifiez les journaux de tâches Airflow pour provide_manifest_integrity_task
ou process_manifest_task
.
Exemple de requête Kusto :
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Étant donné qu’il n’existe aucun journal des erreurs spécifiquement pour les tâches d’intégrité référentielle, vérifiez les instructions de journal de débogage pour voir si tous les enregistrements externes ont été récupérés via le service de recherche.
Par exemple, l’échantillon de sortie de suivi suivant montre un enregistrement demandé via le service de recherche pour l’intégrité référentielle :
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
La sortie indique les enregistrements qui ont été récupérés et qui se trouvaient dans le système. L’objet manifeste associé qui a référencé un enregistrement est supprimé et n’est plus ingéré si vous avez remarqué que certains enregistrements n’étaient pas présents.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Balises juridiques ou listes de contrôle d’accès non valides dans le manifeste
Les enregistrements n’ont pas été ingérés, car le manifeste contient des balises juridiques ou des listes de contrôle d’accès (ACL) non valides.
Causes possibles
- Les listes de contrôle d’accès sont incorrectes.
- Les balises juridiques sont incorrectes.
- Le service de stockage génère des erreurs 5xx.
Statut du flux de travail
L’état du flux de travail est marqué comme finished
. Vous ne remarquez pas de défaillance dans l’état du flux de travail.
Solution
Vérifiez les journaux de tâches Airflow pour process_single_manifest_file_task
ou process_manifest_task
.
Exemple de requête Kusto :
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Échantillon de sortie de suivi :
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
La sortie indique les enregistrements qui ont été récupérés. Les enregistrements d’entité manifeste qui correspondent aux enregistrements manquants dans la recherche sont supprimés et non ingérés.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Problèmes connus
- Étant donné qu’il n’existe aucun journal d’erreurs spécifique pour les tâches d’intégrité référentielle, vous devez rechercher manuellement les instructions de journal de débogage pour voir si tous les enregistrements externes ont été récupérés via le service de recherche.
Étapes suivantes
Passez au tutoriel suivant et découvrez comment effectuer une ingestion de fichier basée sur un manifeste :