使用 Airflow 工作記錄針對指令清單擷取問題進行疑難解答
本文可協助您使用 Airflow 工作記錄,針對 Azure Data Manager for Energy 中的指令清單擷取工作流程問題進行疑難解答。
指令清單擷取DAG工作流程類型
指令清單擷取有兩種類型的有向無循環圖表 (DAG) 工作流程:單一指令清單和批次上傳。
單一指令清單
單一指令清單檔案可用來觸發指令清單擷取工作流程。
DagTaskName 值 | 描述 |
---|---|
update_status_running_task |
呼叫工作流程服務,並將 DAG 的狀態標示為 running 資料庫中。 |
check_payload_type |
驗證擷取的類型是批次或單一指令清單。 |
validate_manifest_schema_task |
確保指令清單中提及的所有架構類型都存在,而且有引用架構完整性。 所有無效的值都會從指令清單收回。 |
provide_manifest_intergrity_task |
驗證 OSDU® R3 指令清單內的參考,並移除無效的實體。 此運算子負責父/子驗證。 所有類似孤立的實體都會記錄並排除在已驗證的指令清單中。 系統會搜尋任何外部參考的記錄。 如果找不到任何專案,則會卸除指令清單實體。 也會解析所有 Surrogate 索引鍵參考。 |
process_single_manifest_file_task |
執行從上一個步驟取得的最終指令清單實體擷取。 數據記錄會透過記憶體服務內嵌。 |
update_status_finished_task |
呼叫工作流程服務,並將 DAG 的狀態標示為 finished 或 failed 資料庫中。 |
批次上傳
多個指令清單檔案是相同工作流程服務要求的一部分。 要求承載中的指令清單區段是清單,而不是專案的字典。
DagTaskName 值 | 描述 |
---|---|
update_status_running_task |
呼叫工作流程服務,並將 DAG 的狀態標示為 running 資料庫中。 |
check_payload_type |
驗證擷取的類型是批次或單一指令清單。 |
batch_upload |
將指令清單清單分成三個批次,以平行方式處理。 (未發出任何工作記錄檔。 |
process_manifest_task_(1 / 2 / 3) |
將指令清單清單分成三個群組,並加以處理。 、 provide_manifest_intergrity_task 和 process_single_manifest_file_task 中validate_manifest_schema_task 執行的所有步驟都會壓縮並依序在這些工作中執行。 |
update_status_finished_task |
呼叫工作流程服務,並將 DAG 的狀態標示為 finished 或 failed 資料庫中。 |
根據承載類型(單一或批次),工作 check_payload_type
會選擇適當的分支,並略過其他分支中的工作。
必要條件
您應該已整合 Airflow 工作記錄與 Azure 監視器。 請參閱 整合 Airflow 記錄與 Azure 監視器。
下列數據行會在 Airflow 工作記錄中公開,讓您偵錯問題:
參數名稱 | 描述 |
---|---|
RunID |
觸發之 DAG 執行的唯一執行標識碼。 |
CorrelationID |
DAG 執行的唯一相互關聯標識碼(與執行標識符相同)。 |
DagName |
DAG 工作流程名稱。 例如, Osdu_ingest 是指令清單擷取的工作流程名稱。 |
DagTaskName |
DAG 工作流程的工作名稱。 例如, update_status_running_task 是指令清單擷取的工作名稱。 |
Content |
在工作執行期間,Airflow 發出的錯誤記錄訊息(錯誤或例外狀況)。 |
LogTimeStamp |
DAG 執行的時間間隔。 |
LogLevel |
錯誤的層級。 DEBUG 值為 、INFO 、 WARNING 和 ERROR 。 您可以藉由篩選層級 ERROR 來查看大部分的例外狀況和錯誤訊息。 |
執行失敗的 DAG
或 中的Update_status_running_task
Update_status_finished_task
工作流程執行失敗,而且數據記錄並未內嵌。
可能的原因
- 未驗證數據分割 API 的呼叫,因為數據分割標識元不正確。
- 要求主體執行內容中的索引鍵名稱不正確。
- 工作流程服務未執行或擲回 5xx 錯誤。
工作流程狀態
工作流程狀態會標示為 failed
。
解決方案
檢查 或update_status_finished_task
的 Airflow 工作記錄update_status_running_task
。 傳遞正確的數據分割識別碼或索引鍵名稱,以修正承載。
範例 Kusto 查詢:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
範例追蹤輸出:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
失敗的架構驗證
記錄未擷取,因為架構驗證失敗。
可能的原因
- 架構服務擲回「找不到架構」錯誤。
- 指令清單主體不符合架構類型。
- 架構參考不正確。
- 架構服務擲回 5xx 錯誤。
工作流程狀態
工作流程狀態會標示為 finished
。 您不會在工作流程狀態中觀察到失敗,因為會略過無效的實體並繼續進行擷取。
解決方案
檢查 或process_manifest_task
的 Airflow 工作記錄validate_manifest_schema_task
。 傳遞正確的數據分割識別碼或索引鍵名稱,以修正承載。
範例 Kusto 查詢:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
範例追蹤輸出:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
失敗的參考檢查
記錄並未擷取,因為參考檢查失敗。
可能的原因
- 找不到參考的記錄。
- 找不到父記錄。
- 搜尋服務擲回 5xx 錯誤。
工作流程狀態
工作流程狀態會標示為 finished
。 您不會在工作流程狀態中觀察到失敗,因為會略過無效的實體並繼續進行擷取。
解決方案
檢查 或process_manifest_task
的 Airflow 工作記錄provide_manifest_integrity_task
。
範例 Kusto 查詢:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
由於參考完整性工作沒有特別的錯誤記錄,請檢查偵錯記錄語句,以查看是否透過搜尋服務擷取所有外部記錄。
例如,下列範例追蹤輸出會顯示透過搜尋服務查詢的記錄,以取得引用完整性:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
輸出會顯示已擷取且位於系統中的記錄。 參考記錄的相關指令清單物件將會卸除,如果您注意到某些記錄不存在,就不會再內嵌。
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
指令清單中無效的法律標籤或 ACL
記錄並未內嵌,因為指令清單包含無效的法律標籤或訪問控制清單(ACL)。
可能的原因
- ACL 不正確。
- 法律標籤不正確。
- 記憶體服務擲回 5xx 錯誤。
工作流程狀態
工作流程狀態會標示為 finished
。 您不會在工作流程狀態中觀察到失敗。
解決方案
檢查 或process_manifest_task
的 Airflow 工作記錄process_single_manifest_file_task
。
範例 Kusto 查詢:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
範例追蹤輸出:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
輸出表示已擷取的記錄。 對應至遺漏搜尋記錄的指令清單實體記錄會遭到捨棄,而不會擷取。
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
已知問題
- 由於引用完整性工作沒有特定的錯誤記錄,因此您必須手動搜尋偵錯記錄語句,以查看是否透過搜尋服務擷取所有外部記錄。
下一步
前進到下列教學課程,並瞭解如何執行以指令清單為基礎的檔案擷取: