分享方式:


使用 Azure Data Factory 工作流程協調管理員的 Apache Flink® 作業協調流程 (由 Apache Airflow 提供)

注意

AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。

在淘汰日期之前,只有基本支援可用。

重要

此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。

本文涵蓋使用 Azure REST API 與協調流程資料管線搭配 Azure Data Factory 工作流程協調管理員,管理 Flink 作業。 Azure Data Factory 工作流程協調管理員 服務是建立及管理 Apache Airflow 環境的方式,簡單且有效率,可讓您輕鬆地大規模執行資料管道。

Apache Airflow 是開放原始碼平台,以程式設計方式建立、排程及監視複雜的資料工作流程。 它可讓您定義一組稱為運算子的工作,這些運算子可以合併成有向非循環圖 (DAG) 代表資料管道。

下圖顯示 Azure 中 AKS 上的 Airflow、Key Vault 和 HDInsight 節點的位置。

螢幕擷取畫面顯示 Azure 中 AKS 上的 Airflow、Key Vault 和 HDInsight 節點的位置。

系統會根據範圍來建立多個 Azure 服務主體,以限制其所需的存取權,以及獨立管理用戶端認證生命週期。

建議定期輪替存取金鑰或祕密。

安裝步驟

  1. 設定 Flink 叢集

  2. 將您的 Flink 作業 jar 上傳至儲存體帳戶。 它可以是與 Flink 叢集或任何其他儲存體帳戶相關聯的主要儲存體帳戶,您應該在這裡將「儲存體 Blob 資料擁有者」角色,指派給用於這個儲存體帳戶叢集使用的使用者指派 MSI。

  3. Azure Key Vault - 如果您沒有 Azure Key Vault,可以遵循這個教學課程建立新的 Azure Key Vault

  4. 建立 Microsoft Entra 服務主體以存取金鑰保存庫 – 授與權限以使用「金鑰保存庫祕密長」角色存取 Azure Key Vault,並記下回應提供的「appId」、「密碼」與「租用戶」。 若要使用金鑰保存庫儲存體做為儲存敏感資訊的後端,Airflow 必須使用相同的資訊。

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. 啟用適用於 Azure Key Vault 的工作流程協調管理員,以安全且集中的方式儲存及管理敏感資訊。 如此一來,您就可以使用變數和連線,並可自動將其儲存在 Azure Key Vault。 連線和變數的名稱前面必須加上 AIRFLOW__SECRETS__BACKEND_KWARGS 定義的 variables_prefix 前置詞。 例如,如果 variables_prefix 具有 hdinsight-aks-variables 的值,則針對 hello 的變數鍵值,不妨將變數儲存在 hdinsight-aks-variable -hello。

    • 在整合式執行階段屬性,新增 Airflow 設定覆寫的下列設定:

      • AIRFLOW__SECRETS__BACKEND:"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • 在 Airflow 整合執行階段屬性新增環境變數組態的下列設定:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      新增 Airflow 需求 - apache-airflow-providers-microsoft-azure

      顯示資料流組態和環境變數的螢幕擷取畫面。

  6. 建立 Microsoft Entra 服務主體以存取 Azure – 使用參與者角色授與權限存取 HDInsight AKS 叢集,並且記下回應提供的 appId、密碼和租用戶。

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. 使用先前 AD 服務主體 appId、密碼和租用戶的值,在金鑰保存庫中建立下列祕密,前面加上 AIRFLOW__SECRETS__BACKEND_KWARGS 定義的 variables_prefix 屬性。 DAG 程式碼不需要 variables_prefix,即可存取這些變數的任何一個。

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

DAG 定義

DAG (有向非循環圖) 是 Airflow 的核心概念,會將工作收集在一起,並以相依性和關聯性的方式整理,說明工作應該如何執行。

有三種方式可以宣告 DAG:

  1. 您可以使用內容管理員,以隱含方式將 DAG 新增至其中的任何項目

  2. 您可以使用標準建構函式,將 DAG 傳遞至任何您使用的運算子

  3. 您可以使用 @dag 裝飾項目將函式轉換成 DAG 產生器 (從 airflow.decorators 匯入 dag)

DAG 若沒有要執行的工作便毫無意義,以運算子、感應器或 TaskFlow 的形式出現。

您可以直接從 Apache Airflow,閱讀 DAG、控制流程、SubDAG、TaskGroups 的相關詳細資料。 

DAG 執行

Git 提供程式碼範例;在本機電腦下載程式碼,並將 wordcount.py 上傳至 Blob 儲存體。 請遵循步驟,將 DAG 匯入安裝期間建立的工作流程。

wordcount.py 是使用 Apache Airflow 搭配 HDInsight on AKS 協調 Flink 作業提交的範例。 DAG 有兩項工作:

  • 取得 OAuth Token

  • 叫用 HDInsight Flink 作業提交 Azure REST API 以提交新作業

DAG 預期有服務主體的設定,如 OAuth 用戶端認證安裝流程所述,並且會傳遞下列執行輸入設定。

執行步驟

  1. Airflow UI 執行 DAG,按一下 [監視] 圖示即可開啟 Azure Data Factory 工作流程協調管理員 UI。

    顯示開啟 Azure Data Factory 工作流程協調管理員 UI 的螢幕擷取畫面,方法是按一下監視圖示。

  2. 從 [DAG] 頁面選取 “FlinkWordCountExample” DAG。

    顯示選取 Flink 字數計數範例的螢幕擷取畫面。

  3. 按一下右上角的 [執行] 圖示,然後選取 [使用 config 觸發 DAG]。

    顯示選取執行圖示的螢幕擷取畫面。

  4. 傳遞必要的設定 JSON

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscritpion":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. 按一下 [觸發] 按鈕,就會開始執行 DAG。

  6. 您可以從 DAG 執行將 DAG 工作狀態視覺化

    顯示 DAG 工作狀態的螢幕擷取畫面。

  7. 從入口網站驗證作業執行

    顯示驗證作業執行的螢幕擷取畫面。

  8. 從 [Apache Flink 儀表板] 驗證作業

    顯示 apache Flink 儀表板的螢幕擷取畫面。

程式碼範例

這是使用 Airflow 搭配 HDInsight on AKS 協調資料管道的範例。

DAG 預期有 OAuth 用戶端認證服務主體的設定,並且會傳遞下列執行輸入設定:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscritpion':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

參考