Поделиться через


Оркестрация заданий Apache Flink® с помощью диспетчера оркестрации рабочих процессов Фабрика данных Azure (на основе Apache Airflow)

Внимание

Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.

В этой статье описывается управление заданием Flink с помощью REST API Azure и конвейера данных оркестрации с помощью диспетчера оркестрации рабочих процессов Фабрика данных Azure. Фабрика данных Azure служба Диспетчера оркестрации рабочих процессов — это простой и эффективный способ создания сред Apache Airflow и управления ими, что позволяет легко запускать конвейеры данных в большом масштабе.

Apache Airflow — это платформа с открытым исходным кодом, которая программно создает, планирует и отслеживает сложные рабочие процессы данных. Он позволяет определить набор задач, называемых операторами, которые можно объединить в ациклические графы (DAG) для представления конвейеров данных.

На следующей схеме показано размещение Airflow, Key Vault и HDInsight в AKS в Azure.

Снимок экрана: размещение воздушных потоков, хранилища ключей и HDInsight в AKS в Azure.

Несколько субъектов-служб Azure создаются в зависимости от области, чтобы ограничить доступ к ней и управлять жизненным циклом учетных данных клиента независимо.

Рекомендуется периодически менять ключи доступа или секреты.

Шаги установки

  1. Настройка кластера Flink

  2. Отправьте jar-файл задания Flink в учетную запись хранения. Это может быть основная учетная запись хранения, связанная с кластером Flink или любой другой учетной записью хранения, где необходимо назначить роль "Владелец данных BLOB-объектов хранилища" назначенному пользователем MSI, используемому для кластера в этой учетной записи хранения.

  3. Azure Key Vault. Вы можете следовать этому руководству, чтобы создать azure Key Vault в случае, если у вас нет.

  4. Создайте субъект-службу Microsoft Entra для доступа к Key Vault— предоставьте разрешение на доступ к Azure Key Vault с ролью "Сотрудник по секретам Key Vault" и запишите идентификатор appId "password" и "tenant" из ответа. Мы должны использовать то же самое для Airflow, чтобы использовать хранилище Key Vault в качестве серверных элементов для хранения конфиденциальной информации.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Включите Azure Key Vault для диспетчера оркестрации рабочих процессов для хранения конфиденциальных данных и управления ими в безопасном и централизованном режиме. Для этого можно использовать переменные и подключения, а также автоматически храниться в Azure Key Vault. Имя подключений и переменных должно быть префиксировано variables_prefix, определенное в AIRFLOW__SECRETS__BACKEND_KWARGS. Например, если variables_prefix имеет значение в виде переменных hdinsight-aks-переменных, то для ключа переменной hello необходимо сохранить переменную в hdinsight-aks-variable -hello.

    • Добавьте следующие параметры для переопределения конфигурации Airflow в встроенных свойствах среды выполнения:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Добавьте следующий параметр для конфигурации переменных среды в свойствах интегрированной среды выполнения Airflow:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Добавление требований к Airflow — apache-airflow-providers-microsoft-azure

      Снимок экрана: конфигурация потока воздуха и переменные среды.

  6. Создайте субъект-службу Microsoft Entra для доступа к Azure— предоставьте разрешение на доступ к кластеру HDInsight AKS с ролью участника, запишите идентификатор приложения, пароль и клиент из ответа.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Создайте следующие секреты в хранилище ключей со значением из предыдущего идентификатора приложения субъекта-службы AD, пароля и клиента, префиксированного свойством variables_prefix, определенного в AIRFLOW__SECRETS__BACKEND_KWARGS. Код DAG может получить доступ к любой из этих переменных без variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-переменные-api-secret=<Password from previous step>

    • hdinsight-aks-переменные-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Определение DAG

DAG (направленный ациклический граф) — это основная концепция Airflow, сбор задач вместе, организованная с зависимостями и связями, чтобы сказать, как они должны работать.

Существует три способа объявления DAG:

  1. Вы можете использовать диспетчер контекстов, который добавляет DAG в любое содержимое внутри него неявно.

  2. Вы можете использовать стандартный конструктор, передав DAG в любые операторы, которые вы используете.

  3. С помощью @dag декоратора можно превратить функцию в генератор DAG (из airflow.декоратор импорта dag)

Группы доступности не имеют ничего без выполнения задач, и они находятся в виде операторов, датчиков или taskFlow.

Дополнительные сведения о группах управления, потоках управления, вложенных группах задач и т. д. можно узнать непосредственно из Apache Airflow. 

Выполнение DAG

Пример кода доступен в Git. Скачайте код локально на компьютере и отправьте wordcount.py в хранилище BLOB-объектов. Выполните действия, чтобы импортировать DAG в рабочий процесс, созданный во время установки.

Wordcount.py является примером оркестрации отправки задания Flink с помощью Apache Airflow с HDInsight в AKS. DAG имеет две задачи:

  • Получить OAuth Token

  • Вызов AZURE REST API отправки заданий HDInsight Flink для отправки нового задания

DaG ожидает установки для субъекта-службы, как описано в процессе установки учетных данных клиента OAuth и передать следующую входную конфигурацию для выполнения.

Шаги выполнения

  1. Выполните daG из пользовательского интерфейса Airflow, чтобы открыть пользовательский интерфейс диспетчера оркестрации рабочих процессов Фабрика данных Azure, щелкнув значок монитора.

    Снимок экрана: открытие пользовательского интерфейса диспетчера оркестрации рабочих процессов Фабрика данных Azure, щелкнув значок монитора.

  2. Выберите DAG FlinkWordCountExample на странице "DAG".

    Снимок экрана: выбор примера числа слов Flink.

  3. Щелкните значок "Выполнить" в правом верхнем углу и выберите "Активировать DAG w/config".

    Снимок экрана: значок выполнения.

  4. Передача необходимой конфигурации JSON

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscritpion":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Нажмите кнопку "Триггер", она запускает выполнение DAG.

  6. Вы можете визуализировать состояние задач DAG из запуска DAG

    Снимок экрана: состояние задачи dag.

  7. Проверка выполнения задания с портала

    Снимок экрана: проверка выполнения задания.

  8. Проверка задания на панели мониторинга Apache Flink

    Снимок экрана: панель мониторинга Apache Flink.

Пример кода

Это пример оркестрации конвейера данных с помощью Airflow с HDInsight в AKS.

DaG ожидает установки субъекта-службы для учетных данных клиента OAuth и передает следующую входную конфигурацию для выполнения:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscritpion':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Справочные материалы