共用方式為


使用 Azure Data Factory 工作流程協調流程管理員的 Apache Flink® 作業協調流程 (由 Apache Airflow 提供電源)

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

本文涵蓋使用 Azure REST API 和 Azure Data Factory 工作流程協調流程管理員來管理 Flink 作業和協調流程數據管線。 Azure Data Factory 工作流程協調流程管理員 服務是建立和管理 Apache Airflow 環境的簡單且有效率的方式,可讓您輕鬆地大規模執行數據管線。

Apache Airflow 是開放原始碼平臺,以程式設計方式建立、排程及監視複雜的數據工作流程。 它可讓您定義一組稱為運算子的工作,這些運算符可以合併成有向無循環圖表(DAG)來代表數據管線。

下圖顯示 Azure 中 AKS 的 Airflow、金鑰保存庫 和 HDInsight 位置。

此螢幕快照顯示 Azure 中 AKS 上空流、金鑰保存庫和 HDInsight 的位置。

系統會根據範圍來建立多個 Azure 服務主體,以限制其所需的存取權,以及獨立管理客戶端認證生命週期。

建議定期輪替存取密鑰或秘密。

安裝步驟

  1. 設定 Flink 叢集

  2. 將您的 Flink 作業 jar 上傳至記憶體帳戶。 它可以是與 Flink 叢集或任何其他記憶體帳戶相關聯的主要記憶體帳戶,您應該在此記憶體帳戶中將「儲存體 Blob 數據擁有者」角色指派給用於此儲存器帳戶中叢集的使用者指派 MSI。

  3. Azure 金鑰保存庫 - 如果您沒有 Azure 金鑰保存庫,您可以遵循本教學課程來建立新的 Azure 金鑰保存庫

  4. 建立 Microsoft Entra 服務主體以存取 金鑰保存庫 – 使用「金鑰保存庫 秘密人員」角色授與存取 Azure 金鑰保存庫 的許可權,並記下回應中的 'appId' 'password' 和 'tenant'。 我們需要針對 Airflow 使用相同的 ,才能使用 金鑰保存庫 記憶體作為儲存敏感性資訊的後端。

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. 啟用工作流程協調流程管理員的 Azure 金鑰保存庫,以安全且集中的方式儲存和管理您的敏感性資訊。 如此一來,您就可以使用變數和連線,而且它們會自動儲存在 Azure 金鑰保存庫 中。 連接和變數的名稱必須加上AIRFLOW__SECRETS__BACKEND_KWARGS中定義的variables_prefix前置詞。 例如,如果variables_prefix具有 hdinsight-aks-variables 的值,則針對 hello 的變數索引鍵,您會想要將變數儲存在 hdinsight-aks-variable -hello。

    • 在整合式運行時間屬性中新增 Airflow 組態覆寫的下列設定:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • 在 Airflow 整合執行時間屬性中新增環境變數組態的下列設定:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      新增 Airflow 需求 - apache-airflow-providers-microsoft-azure

      顯示數據流組態和環境變數的螢幕快照。

  6. 建立 Microsoft Entra 服務主體 以存取 Azure – 授與具有參與者角色的 HDInsight AKS 叢集存取許可權,記下回應中的 appId、密碼和租使用者。

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. 在密鑰保存庫中建立下列秘密,其中包含先前 AD 服務主體 appId、密碼和租使用者的值,前面加上屬性variables_prefix定義於 AIRFLOW__SECRETS__BACKEND_KWARGS。 DAG 程式代碼可以存取任何這些變數,而不需要variables_prefix。

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

DAG 定義

DAG(有向無循環圖)是 Airflow 的核心概念,會一起收集工作,並以相依性和關聯性組織,以說明其應該如何執行。

有三種方式可以宣告 DAG:

  1. 您可以使用內容管理員,以隱含方式將 DAG 新增至其中的任何專案

  2. 您可以使用標準建構函式,將 DAG 傳遞至您使用的任何運算元

  3. 您可以使用 @dag 裝飾專案將函式轉換成 DAG 產生器(從 airflow.decorators 匯入 dag)

DAG 沒有要執行的工作,而這些工作是以運算符、感測器或TaskFlow的形式出現。

您可以直接從 Apache Airflow 閱讀 DAG、控制流程、SubDAG、TaskGroups 等詳細數據。 

DAG 執行

Git提供範例程式代碼;在本機計算機上下載程式代碼,並將 wordcount.py 上傳至 Blob 記憶體。 請遵循步驟,將DAG匯入安裝期間建立的工作流程。

wordcount.py 是使用 Apache Airflow 搭配 AKS 上的 HDInsight 協調 Flink 作業提交的範例。 此範例是以 Apache Flink提供的 wordcount 範例為基礎。

DAG 有兩項工作:

  • 獲取 OAuth Token

  • 叫用 HDInsight Flink 作業提交 Azure REST API 以提交新作業

DAG 預期會有服務主體的設定,如 OAuth 客戶端認證的安裝程式中所述,並傳遞下列輸入組態來執行。

執行步驟

  1. Airflow UI 執行 DAG,您可以按兩下 [監視] 圖示來開啟 Azure Data Factory 工作流程協調流程管理員 UI。

    顯示開啟 Azure Data Factory 工作流程協調流程管理員 UI 的螢幕快照,方法是按兩下監視圖示。

  2. 從 [DAG] 頁面中選取 “FlinkWordCountExample” DAG。

    顯示選取 Flink 字數計數範例的螢幕快照。

  3. 按兩下右上角的 [執行] 圖示,然後選取 [觸發DAG w/ config]。

    顯示選取執行圖示的螢幕快照。

  4. 傳遞必要的設定 JSON

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscritpion":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. 按兩下 [觸發程式] 按鈕,就會開始執行DAG。

  6. 您可以從 DAG 執行將 DAG 工作的狀態可視化

    顯示 dag 工作狀態的螢幕快照。

  7. 從入口網站驗證作業執行

    顯示驗證作業執行的螢幕快照。

  8. 從 「Apache Flink 儀錶板」驗證作業

    顯示apache Flink儀錶板的螢幕快照。

程式碼範例

這是在 AKS 上使用 Airflow 搭配 HDInsight 協調數據管線的範例

此範例是以 Apache Flink 上 提供的 wordcount 範例為基礎

DAG 預期已針對 OAuth 用戶端認證設定服務主體,並針對執行傳遞下列輸入組態

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscritpion':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

參考