Поделиться через


Управление заданиями Apache Flink® в HDInsight в кластерах AKS

Примечание.

Мы отставим Azure HDInsight в AKS 31 января 2025 г. До 31 января 2025 г. необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого прекращения рабочих нагрузок. Оставшиеся кластеры в подписке будут остановлены и удалены из узла.

До даты выхода на пенсию будет доступна только базовая поддержка.

Внимание

Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.

HDInsight в AKS предоставляет функцию для управления заданиями Apache Flink® непосредственно через портал Azure (удобный интерфейс) и REST API ARM.

Эта функция позволяет пользователям эффективно контролировать и отслеживать свои задания Apache Flink, не требуя глубоких знаний на уровне кластера.

Льготы

  • Упрощенное управление заданиями. Благодаря собственной интеграции Apache Flink в портал Azure пользователи больше не требуют обширных знаний о кластерах Flink для отправки, управления и мониторинга заданий.

  • Пользовательский REST API: HDInsight в AKS предоставляет удобные API REST ARM для отправки заданий Flink и управления ими. Пользователи могут отправлять задания Flink из любой службы Azure с помощью этих REST API.

  • Обновления заданий без усилий и управление состоянием. Встроенная интеграция портал Azure предоставляет бесплатный интерфейс для обновления заданий и их восстановления до последнего сохраненного состояния (savepoint). Эта функция обеспечивает непрерывность и целостность данных на протяжении всего жизненного цикла задания.

  • Автоматизация задания Flink с помощью конвейера Azure. Использование HDInsight в AKS, пользователи Flink имеют доступ к удобному API REST ARM, вы можете легко интегрировать операции задания Flink в Azure Pipeline. Независимо от того, запускаете ли вы новые задания, обновляете выполняемые задания или выполняете различные операции задания, этот упрощенный подход устраняет действия вручную. Он позволяет эффективно управлять кластером Flink.

Необходимые компоненты

Перед отправкой заданий на портале или REST API есть некоторые предварительные требования.

  • Создайте каталог в основной учетной записи хранения кластера для отправки jar-файла задания.

  • Если пользователь хочет сохранить точки сохранения, создайте каталог в учетной записи хранения для точек сохранения заданий.

    Снимок экрана: структура каталогов.

Ключевые функции и операции

  • Новая отправка заданий. Пользователи могут легко отправлять новую Flink, устраняя необходимость сложных конфигураций или внешних средств.

  • Остановите и запустите задания с помощью точек сохранения: пользователи могут корректно остановить и запустить задания Flink из предыдущего состояния (Savepoint). Точки сохранения обеспечивают сохранение хода выполнения задания, обеспечивая простое возобновление работы.

  • Обновления заданий: пользователь может обновить выполняемую работу после обновления jar-файла в учетной записи хранения. Это обновление автоматически принимает точку сохранения и запускает задание с новым jar-файлом.

  • Обновления без отслеживания состояния. Выполнение нового перезапуска для задания упрощается с помощью обновлений без отслеживания состояния. Эта функция позволяет пользователям инициировать чистый перезапуск с помощью обновленного jar-файла задания.

  • Управление точками сохранения. В любой момент пользователи могут создавать точки сохранения для выполняемых заданий. Эти точки сохранения можно перечислить и использовать для перезапуска задания с определенной контрольной точки по мере необходимости.

  • Отмена. Это отменяет задание окончательно.

  • Удаление: удаление записи журнала заданий.

Параметры управления заданиями в HDInsight в AKS

HDInsight в AKS предоставляет способы управления заданиями Flink.

Управление заданиями из портал Azure

Чтобы запустить задание Flink на портале, перейдите по следующим ссылкам:

Портал -> HDInsight в пуле кластеров AKS -> Flink Cluster --> Settings --> Flink Jobs

Снимок экрана: запуск задания flink.

  • Новое задание. Чтобы отправить новое задание, отправьте jar-файлы задания в учетную запись хранения и создайте каталог точки сохранения. Заполните шаблон с необходимыми конфигурациями и отправьте задание.

    Снимок экрана: создание нового задания.

    Сведения о свойстве:

    Свойство Описание: Значение по умолчанию Обязательно
    Имя задания Уникальное имя задания. Это отображается на портале. Имя задания должно быть небольшим. Да
    Путь jar Путь к хранилищу для jar-файла задания. Пользователи должны создать каталог в хранилище кластера и отправить jar-файл задания. Да
    Класс записи Класс записи для задания, из которого запускается выполнение задания. Да
    Args Аргумент основной программы задания. Разделите все аргументы пробелами. No
    параллелизм Параллелизм задания Flink. 2 Да
    savepoint.directory Каталог Savepoint для задания. Рекомендуется создать новый каталог для сохранения заданий в учетной записи хранения. abfs://<container>@<account>/<deployment-ID>/savepoints No

    После запуска задания состояние задания на портале выполняется.

  • Остановка: задание остановки не требует каких-либо параметров, пользователь может остановить задание, выбрав действие.

    Снимок экрана: как пользователь может остановить задание.

    После остановки задания состояние задания на портале остановлено.

  • Начало. Это действие запускает задание из точки сохранения. Чтобы запустить задание, выберите остановленное задание и запустите его.

    Снимок экрана: запуск задания пользователя.

    Заполните шаблон потока необходимыми параметрами и запустите его. Пользователям необходимо выбрать точку сохранения, с которой пользователь хочет запустить задание. По умолчанию она принимает последнюю успешную точку сохранения.

    Снимок экрана: шаблон потока заливки.

    Сведения о свойстве:

    Свойство Описание: Значение по умолчанию Обязательно
    Args Аргумент основной программы задания. Все аргументы должны быть разделены пробелами. No
    Последняя точка сохранения Последнее успешное выполнение точки сохранения перед остановкой задания. Это будет использоваться по умолчанию, если не выбрана точка сохранения. Не редактируемый
    Имя точки сохранения Пользователи могут перечислить доступную точку сохранения для задания и выбрать ее, чтобы запустить задание. No

    После запуска задания состояние задания на портале будет ЗАПУЩЕНо.

  • Обновление: обновление помогает перезапустить задания с обновленным кодом задания. Пользователям необходимо обновить последний jar-файл задания в расположении хранилища и обновить задание на портале. Это обновление останавливает задание с помощью точки сохранения и начинается с последнего jar-файла.

    Снимок экрана: перезапуск заданий с обновленным кодом задания.

    Шаблон для обновления задания.

    Снимок экрана: шаблон для обновления задания.

    После обновления задания состояние задания на портале — "ВЫПОЛНЕНИЕ".

  • Обновление без отслеживания состояния: это задание похоже на обновление, но оно включает в себя свежий перезапуск задания с последним кодом.

    Снимок экрана: свежий перезапуск задания с последним кодом.

    Шаблон для обновления задания.

    Снимок экрана: шаблон для обновления задания без отслеживания состояния.

    Сведения о свойстве:

    Свойство Описание: Значение по умолчанию Обязательно
    Args Аргумент основной программы задания. Разделите все аргументы пробелами. No

    После обновления задания состояние задания на портале выполняется.

  • Savepoint: возьмите точку сохранения для задания Flink.

    Снимок экрана: точка сохранения для задания Flink.

    Точка экономии занимает много времени и занимает некоторое время. Состояние действия задания отображается как выполняющееся.

    Снимок экрана: состояние действия задания.

  • Отмена. Это задание помогает пользователю завершить задание.

    Снимок экрана: как пользователь может завершить задание.

  • Удаление: удаление данных задания с портала.

    Снимок экрана: как пользователь может удалять данные задания с портала.

  • Просмотр сведений о задании. Чтобы просмотреть пользователя сведений о задании, можно щелкнуть имя задания, он содержит сведения о задании и последнем результате действия.

    Снимок экрана: просмотр сведений о задании.

    Для любого неудачного действия этот json задания предоставляет подробные исключения и причины сбоя.

Управление заданиями с помощью REST API

HDInsight в AKS поддерживает удобные API REST ARM для отправки заданий и управления заданиями. С помощью этого REST API Flink можно легко интегрировать операции задания Flink в Azure Pipeline. Независимо от того, запускаете ли вы новые задания, обновляете выполняемые задания или выполняете различные операции задания, этот упрощенный подход устраняет действия вручную и позволяет эффективно управлять кластером Flink.

Базовый формат URL-адреса для REST API

См. следующий URL-адрес для rest API, пользователи должны заменить подписку, группу ресурсов, пул кластеров, имя кластера и HDInsight в версии API AKS перед его использованием. https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}

С помощью этого REST API пользователи могут инициировать новые задания, остановить задания, запустить задания, создать точки сохранения, отменить задания и удалить задания. Текущая API_VERSION — 2023-06-01-preview.

Проверка подлинности REST API

Чтобы пройти проверку подлинности пользователей REST API Flink ARM, необходимо получить маркер носителя или маркер доступа для ресурса ARM. Чтобы выполнить проверку подлинности REST API Azure ARM (Azure Resource Manager) с помощью субъекта-службы, выполните следующие общие действия.

  • Создайте субъект-службу.

    az ad sp create-for-rbac --name <your-SP-name>

  • Предоставьте владельцу разрешение на пакет обновления для flink кластера.

  • Вход с помощью субъекта-службы.

    az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>

  • Получение маркера доступа.

    $token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json

    $tok = $token.accesstoken

    Пользователи могут использовать маркер в URL-адресе, показанном.

    $data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }

Проверка подлинности с помощью управляемого удостоверения: пользователи могут использовать ресурсы, поддерживающие управляемое удостоверение, для вызова REST API задания. Дополнительные сведения см. в документации по управляемому удостоверению .

СПИСОК API и параметров

  • Новое задание: rest API для отправки нового задания в Flink.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса:

    { 
        "properties": { 
            "jobType": "FlinkJob", 
            "jobName": "<JOB_NAME>", 
            "action": "NEW", 
            "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
            "jarName": "<JOB_JAR_NAME>", 
            "entryClass": "<JOB_ENTRY_CLASS>", 
            “args”: ”<JOB_JVM_ARGUMENT>”
            "flinkConfiguration": { 
                "parallelism": "<JOB_PARALLELISM>", 
                "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" 
            } 
         } 
    }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Уникальное имя задания. Это отображается на портале. Имя задания должно быть небольшим. Да
    действие Он указывает тип операции в задании. Оно должно быть "NEW" всегда для запуска нового задания. Да
    jobJarDirectory Путь к хранилищу для jar-каталога задания. Пользователи должны создать каталог в хранилище кластера и отправить jar-файл задания. Да
    jarName Имя jar задания. Да
    entryClass Класс записи для задания, из которого запускается выполнение задания. Да
    args Аргумент основной программы задания. Разделяйте аргументы с пробелом. No
    параллелизм Параллелизм задания Flink. 2 Да
    savepoint.directory Каталог Savepoint для задания. Рекомендуется создать новый каталог для сохранения заданий в учетной записи хранения. abfs://<container>@<account>/<deployment-ID>/savepoints No

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Остановка задания: REST API для остановки текущего выполняемого задания.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

       {
          "properties": {
            "jobType": "FlinkJob",
            "jobName": "<JOB_NAME>",
            "action": "STOP"
          }
        }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, которое используется для запуска задания Да
    действие Это должно быть "STOP" Да

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Запуск задания: REST API для запуска задания ОСТАНОВЛЕННОГО.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

       {
          "properties": {
             "jobType": "FlinkJob",
             "jobName": "<JOB_NAME>",
             "action": "START",
             "savePointName": "<SAVEPOINT_NAME>"
          }
        }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Он должен быть "START" Да
    savePointName Сохраните имя точки для запуска задания. Это необязательное свойство, по умолчанию выполняется последняя успешная точка сохранения. No

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Обновление задания: REST API для обновления текущего выполняемого задания.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”,
              "savePointName": "<SAVEPOINT_NAME>"
          }
      }
    
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Оно всегда должно быть "UPDATE" для запуска нового задания. Да
    args Аргументы JVM задания No
    savePointName Сохраните имя точки для запуска задания. Это необязательное свойство, по умолчанию запуск операции займет последнюю успешную точку сохранения. No

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Задание обновления без отслеживания состояния: REST API для обновления без отслеживания состояния.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "STATELESS_UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”
          }
      }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Для запуска нового задания всегда должно быть "STATELESS_UPDATE". Да
    args Аргументы JVM задания No

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Savepoint: REST API для активации точки сохранения для задания.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "SAVEPOINT"
          }
      }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Он всегда должен быть "SAVEPOINT" для запуска нового задания. Да

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Список точки сохранения: REST API для перечисления всех точек сохранения из каталога savepoint.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "LIST_SAVEPOINT"
          }
      }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания Да
    действие Это должно быть "LIST_SAVEPOINT" Да

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Отмена: REST API для отмены задания.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "CANCEL"
          }
      }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть FlinkJob Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Оно должно быть ОТМЕНЕНО. Да

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Delete: Rest API для удаления задания.

    Вариант Значение
    Способ POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Текст запроса

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "DELETE"
          }
      }
    

    Сведения о свойстве для текста JSON:

    Свойство Описание: Значение по умолчанию Обязательно
    jobType Тип задания. Это должно быть "FlinkJob" Да
    jobName Имя задания, используемое для запуска задания. Да
    действие Оно должно быть DELETE. Да

    Пример:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Список заданий: rest API для перечисления всех заданий и состояния текущего действия.

    Вариант Значение
    Способ GET
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
    Верхний колонтитул Authorization = "Bearer $token"

    Выходные данные:

    { 
      "value": [ 
          { 
              "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", 
              "properties": { 
                  "jobType": "FlinkJob", 
                  "jobName": "job1", 
                  "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
                  "jarName": "<JOB_JAR_NAME>", 
                  "action": "STOP", 
                  "entryClass": "<JOB_ENTRY_CLASS>", 
                  "flinkConfiguration": { 
                      "parallelism": "2", 
                      "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" 
                  }, 
                  "jobId": "20e9e907eb360b1c69510507f88cdb7b", 
                  "status": "STOPPED", 
                  "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", 
                  "actionResult": "SUCCESS", 
                  "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" 
          } 
       }
      ]
    }
    

Примечание.

Если любое действие выполняется, actionResult будет указывать его со значением "IN_PROGRESS" при успешном завершении, оно будет отображаться "SUCCESS", и в случае сбоя оно будет "FAILED".

Справочные материалы