分享方式:


使用 Airflow 工作記錄針對指令清單擷取問題進行疑難解答

本文可協助您使用 Airflow 工作記錄,針對 Azure Data Manager for Energy 中的指令清單擷取工作流程問題進行疑難解答。

指令清單擷取DAG工作流程類型

指令清單擷取有兩種類型的有向無循環圖表 (DAG) 工作流程:單一指令清單和批次上傳。

單一指令清單

單一指令清單檔案可用來觸發指令清單擷取工作流程。

DagTaskName 值 描述
update_status_running_task 呼叫工作流程服務,並將 DAG 的狀態標示為 running 資料庫中。
check_payload_type 驗證擷取的類型是批次或單一指令清單。
validate_manifest_schema_task 確保指令清單中提及的所有架構類型都存在,而且有引用架構完整性。 所有無效的值都會從指令清單收回。
provide_manifest_intergrity_task 驗證 OSDU® R3 指令清單內的參考,並移除無效的實體。 此運算子負責父/子驗證。 所有類似孤立的實體都會記錄並排除在已驗證的指令清單中。 系統會搜尋任何外部參考的記錄。 如果找不到任何專案,則會卸除指令清單實體。 也會解析所有 Surrogate 索引鍵參考。
process_single_manifest_file_task 執行從上一個步驟取得的最終指令清單實體擷取。 數據記錄會透過記憶體服務內嵌。
update_status_finished_task 呼叫工作流程服務,並將 DAG 的狀態標示為 finishedfailed 資料庫中。

批次上傳

多個指令清單檔案是相同工作流程服務要求的一部分。 要求承載中的指令清單區段是清單,而不是專案的字典。

DagTaskName 值 描述
update_status_running_task 呼叫工作流程服務,並將 DAG 的狀態標示為 running 資料庫中。
check_payload_type 驗證擷取的類型是批次或單一指令清單。
batch_upload 將指令清單清單分成三個批次,以平行方式處理。 (未發出任何工作記錄檔。
process_manifest_task_(1 / 2 / 3) 將指令清單清單分成三個群組,並加以處理。 、 provide_manifest_intergrity_taskprocess_single_manifest_file_taskvalidate_manifest_schema_task執行的所有步驟都會壓縮並依序在這些工作中執行。
update_status_finished_task 呼叫工作流程服務,並將 DAG 的狀態標示為 finishedfailed 資料庫中。

根據承載類型(單一或批次),工作 check_payload_type 會選擇適當的分支,並略過其他分支中的工作。

必要條件

您應該已整合 Airflow 工作記錄與 Azure 監視器。 請參閱 整合 Airflow 記錄與 Azure 監視器

下列數據行會在 Airflow 工作記錄中公開,讓您偵錯問題:

參數名稱 描述
RunID 觸發之 DAG 執行的唯一執行標識碼。
CorrelationID DAG 執行的唯一相互關聯標識碼(與執行標識符相同)。
DagName DAG 工作流程名稱。 例如, Osdu_ingest 是指令清單擷取的工作流程名稱。
DagTaskName DAG 工作流程的工作名稱。 例如, update_status_running_task 是指令清單擷取的工作名稱。
Content 在工作執行期間,Airflow 發出的錯誤記錄訊息(錯誤或例外狀況)。
LogTimeStamp DAG 執行的時間間隔。
LogLevel 錯誤的層級。 DEBUG值為 、INFOWARNINGERROR。 您可以藉由篩選層級 ERROR 來查看大部分的例外狀況和錯誤訊息。

執行失敗的 DAG

或 中的Update_status_running_taskUpdate_status_finished_task工作流程執行失敗,而且數據記錄並未內嵌。

可能的原因

  • 未驗證數據分割 API 的呼叫,因為數據分割標識元不正確。
  • 要求主體執行內容中的索引鍵名稱不正確。
  • 工作流程服務未執行或擲回 5xx 錯誤。

工作流程狀態

工作流程狀態會標示為 failed

解決方案

檢查 或update_status_finished_task的 Airflow 工作記錄update_status_running_task。 傳遞正確的數據分割識別碼或索引鍵名稱,以修正承載。

範例 Kusto 查詢:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

範例追蹤輸出:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

失敗的架構驗證

記錄未擷取,因為架構驗證失敗。

可能的原因

  • 架構服務擲回「找不到架構」錯誤。
  • 指令清單主體不符合架構類型。
  • 架構參考不正確。
  • 架構服務擲回 5xx 錯誤。

工作流程狀態

工作流程狀態會標示為 finished。 您不會在工作流程狀態中觀察到失敗,因為會略過無效的實體並繼續進行擷取。

解決方案

檢查 或process_manifest_task的 Airflow 工作記錄validate_manifest_schema_task。 傳遞正確的數據分割識別碼或索引鍵名稱,以修正承載。

範例 Kusto 查詢:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

範例追蹤輸出:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

失敗的參考檢查

記錄並未擷取,因為參考檢查失敗。

可能的原因

  • 找不到參考的記錄。
  • 找不到父記錄。
  • 搜尋服務擲回 5xx 錯誤。

工作流程狀態

工作流程狀態會標示為 finished。 您不會在工作流程狀態中觀察到失敗,因為會略過無效的實體並繼續進行擷取。

解決方案

檢查 或process_manifest_task的 Airflow 工作記錄provide_manifest_integrity_task

範例 Kusto 查詢:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

由於參考完整性工作沒有特別的錯誤記錄,請檢查偵錯記錄語句,以查看是否透過搜尋服務擷取所有外部記錄。

例如,下列範例追蹤輸出會顯示透過搜尋服務查詢的記錄,以取得引用完整性:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

輸出會顯示已擷取且位於系統中的記錄。 參考記錄的相關指令清單物件將會卸除,如果您注意到某些記錄不存在,就不會再內嵌。

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

記錄並未內嵌,因為指令清單包含無效的法律標籤或訪問控制清單(ACL)。

可能的原因

  • ACL 不正確。
  • 法律標籤不正確。
  • 記憶體服務擲回 5xx 錯誤。

工作流程狀態

工作流程狀態會標示為 finished。 您不會在工作流程狀態中觀察到失敗。

解決方案

檢查 或process_manifest_task的 Airflow 工作記錄process_single_manifest_file_task

範例 Kusto 查詢:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

範例追蹤輸出:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

輸出表示已擷取的記錄。 對應至遺漏搜尋記錄的指令清單實體記錄會遭到捨棄,而不會擷取。

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

已知問題

  • 由於引用完整性工作沒有特定的錯誤記錄,因此您必須手動搜尋偵錯記錄語句,以查看是否透過搜尋服務擷取所有外部記錄。

下一步

前進到下列教學課程,並瞭解如何執行以指令清單為基礎的檔案擷取:

參考資料