使用 Azure Data Factory 工作流程協調流程管理員的 Apache Flink® 作業協調流程 (由 Apache Airflow 提供電源)
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
本文涵蓋使用 Azure REST API 和 Azure Data Factory 工作流程協調流程管理員來管理 Flink 作業和協調流程數據管線。 Azure Data Factory 工作流程協調流程管理員 服務是建立和管理 Apache Airflow 環境的簡單且有效率的方式,可讓您輕鬆地大規模執行數據管線。
Apache Airflow 是開放原始碼平臺,以程式設計方式建立、排程及監視複雜的數據工作流程。 它可讓您定義一組稱為運算子的工作,這些運算符可以合併成有向無循環圖表(DAG)來代表數據管線。
下圖顯示 Azure 中 AKS 的 Airflow、金鑰保存庫 和 HDInsight 位置。
系統會根據範圍來建立多個 Azure 服務主體,以限制其所需的存取權,以及獨立管理客戶端認證生命週期。
建議定期輪替存取密鑰或秘密。
安裝步驟
將您的 Flink 作業 jar 上傳至記憶體帳戶。 它可以是與 Flink 叢集或任何其他記憶體帳戶相關聯的主要記憶體帳戶,您應該在此記憶體帳戶中將「儲存體 Blob 數據擁有者」角色指派給用於此儲存器帳戶中叢集的使用者指派 MSI。
Azure 金鑰保存庫 - 如果您沒有 Azure 金鑰保存庫,您可以遵循本教學課程來建立新的 Azure 金鑰保存庫。
建立 Microsoft Entra 服務主體以存取 金鑰保存庫 – 使用「金鑰保存庫 秘密人員」角色授與存取 Azure 金鑰保存庫 的許可權,並記下回應中的 'appId' 'password' 和 'tenant'。 我們需要針對 Airflow 使用相同的 ,才能使用 金鑰保存庫 記憶體作為儲存敏感性資訊的後端。
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
啟用工作流程協調流程管理員的 Azure 金鑰保存庫,以安全且集中的方式儲存和管理您的敏感性資訊。 如此一來,您就可以使用變數和連線,而且它們會自動儲存在 Azure 金鑰保存庫 中。 連接和變數的名稱必須加上AIRFLOW__SECRETS__BACKEND_KWARGS中定義的variables_prefix前置詞。 例如,如果variables_prefix具有 hdinsight-aks-variables 的值,則針對 hello 的變數索引鍵,您會想要將變數儲存在 hdinsight-aks-variable -hello。
在整合式運行時間屬性中新增 Airflow 組態覆寫的下列設定:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
在 Airflow 整合執行時間屬性中新增環境變數組態的下列設定:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
新增 Airflow 需求 - apache-airflow-providers-microsoft-azure
建立 Microsoft Entra 服務主體 以存取 Azure – 授與具有參與者角色的 HDInsight AKS 叢集存取許可權,記下回應中的 appId、密碼和租使用者。
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
在密鑰保存庫中建立下列秘密,其中包含先前 AD 服務主體 appId、密碼和租使用者的值,前面加上屬性variables_prefix定義於 AIRFLOW__SECRETS__BACKEND_KWARGS。 DAG 程式代碼可以存取任何這些變數,而不需要variables_prefix。
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 定義
DAG(有向無循環圖)是 Airflow 的核心概念,會一起收集工作,並以相依性和關聯性組織,以說明其應該如何執行。
有三種方式可以宣告 DAG:
您可以使用內容管理員,以隱含方式將 DAG 新增至其中的任何專案
您可以使用標準建構函式,將 DAG 傳遞至您使用的任何運算元
您可以使用 @dag 裝飾專案將函式轉換成 DAG 產生器(從 airflow.decorators 匯入 dag)
DAG 沒有要執行的工作,而這些工作是以運算符、感測器或TaskFlow的形式出現。
您可以直接從 Apache Airflow 閱讀 DAG、控制流程、SubDAG、TaskGroups 等詳細數據。
DAG 執行
Git 上提供範例程式代碼;在本機計算機上下載程式代碼,並將 wordcount.py 上傳至 Blob 記憶體。 請遵循步驟,將DAG匯入安裝期間建立的工作流程。
wordcount.py 是使用 Apache Airflow 搭配 AKS 上的 HDInsight 協調 Flink 作業提交的範例。 此範例是以 Apache Flink 上提供的 wordcount 範例為基礎。
DAG 有兩項工作:
獲取
OAuth Token
叫用 HDInsight Flink 作業提交 Azure REST API 以提交新作業
DAG 預期會有服務主體的設定,如 OAuth 客戶端認證的安裝程式中所述,並傳遞下列輸入組態來執行。
執行步驟
從 Airflow UI 執行 DAG,您可以按兩下 [監視] 圖示來開啟 Azure Data Factory 工作流程協調流程管理員 UI。
從 [DAG] 頁面中選取 “FlinkWordCountExample” DAG。
按兩下右上角的 [執行] 圖示,然後選取 [觸發DAG w/ config]。
傳遞必要的設定 JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscritpion":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
按兩下 [觸發程式] 按鈕,就會開始執行DAG。
您可以從 DAG 執行將 DAG 工作的狀態可視化
從入口網站驗證作業執行
從 「Apache Flink 儀錶板」驗證作業
程式碼範例
這是在 AKS 上使用 Airflow 搭配 HDInsight 協調數據管線的範例
此範例是以 Apache Flink 上 提供的 wordcount 範例為基礎
DAG 預期已針對 OAuth 用戶端認證設定服務主體,並針對執行傳遞下列輸入組態
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscritpion':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
參考
- 請參閱範例程序 代碼。
- Apache Flink 網站
- Apache、Apache Airflow、Airflow、Apache Flink、Flink 和相關聯的 開放原始碼 項目名稱是 Apache Software Foundation (ASF) 的商標。
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應