Оркестрация заданий Apache Flink® с помощью диспетчера оркестрации рабочих процессов Фабрика данных Azure (на основе Apache Airflow)
Внимание
Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.
В этой статье описывается управление заданием Flink с помощью REST API Azure и конвейера данных оркестрации с помощью диспетчера оркестрации рабочих процессов Фабрика данных Azure. Фабрика данных Azure служба Диспетчера оркестрации рабочих процессов — это простой и эффективный способ создания сред Apache Airflow и управления ими, что позволяет легко запускать конвейеры данных в большом масштабе.
Apache Airflow — это платформа с открытым исходным кодом, которая программно создает, планирует и отслеживает сложные рабочие процессы данных. Он позволяет определить набор задач, называемых операторами, которые можно объединить в ациклические графы (DAG) для представления конвейеров данных.
На следующей схеме показано размещение Airflow, Key Vault и HDInsight в AKS в Azure.
Несколько субъектов-служб Azure создаются в зависимости от области, чтобы ограничить доступ к ней и управлять жизненным циклом учетных данных клиента независимо.
Рекомендуется периодически менять ключи доступа или секреты.
Шаги установки
Отправьте jar-файл задания Flink в учетную запись хранения. Это может быть основная учетная запись хранения, связанная с кластером Flink или любой другой учетной записью хранения, где необходимо назначить роль "Владелец данных BLOB-объектов хранилища" назначенному пользователем MSI, используемому для кластера в этой учетной записи хранения.
Azure Key Vault. Вы можете следовать этому руководству, чтобы создать azure Key Vault в случае, если у вас нет.
Создайте субъект-службу Microsoft Entra для доступа к Key Vault— предоставьте разрешение на доступ к Azure Key Vault с ролью "Сотрудник по секретам Key Vault" и запишите идентификатор appId "password" и "tenant" из ответа. Мы должны использовать то же самое для Airflow, чтобы использовать хранилище Key Vault в качестве серверных элементов для хранения конфиденциальной информации.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Включите Azure Key Vault для диспетчера оркестрации рабочих процессов для хранения конфиденциальных данных и управления ими в безопасном и централизованном режиме. Для этого можно использовать переменные и подключения, а также автоматически храниться в Azure Key Vault. Имя подключений и переменных должно быть префиксировано variables_prefix, определенное в AIRFLOW__SECRETS__BACKEND_KWARGS. Например, если variables_prefix имеет значение в виде переменных hdinsight-aks-переменных, то для ключа переменной hello необходимо сохранить переменную в hdinsight-aks-variable -hello.
Добавьте следующие параметры для переопределения конфигурации Airflow в встроенных свойствах среды выполнения:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Добавьте следующий параметр для конфигурации переменных среды в свойствах интегрированной среды выполнения Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Добавление требований к Airflow — apache-airflow-providers-microsoft-azure
Создайте субъект-службу Microsoft Entra для доступа к Azure— предоставьте разрешение на доступ к кластеру HDInsight AKS с ролью участника, запишите идентификатор приложения, пароль и клиент из ответа.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Создайте следующие секреты в хранилище ключей со значением из предыдущего идентификатора приложения субъекта-службы AD, пароля и клиента, префиксированного свойством variables_prefix, определенного в AIRFLOW__SECRETS__BACKEND_KWARGS. Код DAG может получить доступ к любой из этих переменных без variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-переменные-api-secret=
<Password from previous step>
hdinsight-aks-переменные-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Определение DAG
DAG (направленный ациклический граф) — это основная концепция Airflow, сбор задач вместе, организованная с зависимостями и связями, чтобы сказать, как они должны работать.
Существует три способа объявления DAG:
Вы можете использовать диспетчер контекстов, который добавляет DAG в любое содержимое внутри него неявно.
Вы можете использовать стандартный конструктор, передав DAG в любые операторы, которые вы используете.
С помощью @dag декоратора можно превратить функцию в генератор DAG (из airflow.декоратор импорта dag)
Группы доступности не имеют ничего без выполнения задач, и они находятся в виде операторов, датчиков или taskFlow.
Дополнительные сведения о группах управления, потоках управления, вложенных группах задач и т. д. можно узнать непосредственно из Apache Airflow.
Выполнение DAG
Пример кода доступен в Git. Скачайте код локально на компьютере и отправьте wordcount.py в хранилище BLOB-объектов. Выполните действия, чтобы импортировать DAG в рабочий процесс, созданный во время установки.
Wordcount.py является примером оркестрации отправки задания Flink с помощью Apache Airflow с HDInsight в AKS. DAG имеет две задачи:
Получить
OAuth Token
Вызов AZURE REST API отправки заданий HDInsight Flink для отправки нового задания
DaG ожидает установки для субъекта-службы, как описано в процессе установки учетных данных клиента OAuth и передать следующую входную конфигурацию для выполнения.
Шаги выполнения
Выполните daG из пользовательского интерфейса Airflow, чтобы открыть пользовательский интерфейс диспетчера оркестрации рабочих процессов Фабрика данных Azure, щелкнув значок монитора.
Выберите DAG FlinkWordCountExample на странице "DAG".
Щелкните значок "Выполнить" в правом верхнем углу и выберите "Активировать DAG w/config".
Передача необходимой конфигурации JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscritpion":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Нажмите кнопку "Триггер", она запускает выполнение DAG.
Вы можете визуализировать состояние задач DAG из запуска DAG
Проверка выполнения задания с портала
Проверка задания на панели мониторинга Apache Flink
Пример кода
Это пример оркестрации конвейера данных с помощью Airflow с HDInsight в AKS.
DaG ожидает установки субъекта-службы для учетных данных клиента OAuth и передает следующую входную конфигурацию для выполнения:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscritpion':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Справочные материалы
- Ознакомьтесь с примером кода.
- Веб-сайт Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).
Кері байланыс
https://aka.ms/ContentUserFeedback.
Жақында қолжетімді болады: 2024 жыл бойы біз GitHub Issues жүйесін мазмұнға арналған кері байланыс механизмі ретінде біртіндеп қолданыстан шығарамыз және оны жаңа кері байланыс жүйесімен ауыстырамыз. Қосымша ақпаратты мұнда қараңыз:Жіберу және пікірді көру