分享方式:


AKS 叢集上 HDInsight 中的 Apache Flink® 作業管理

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

AKS 上的 HDInsight 提供一項功能,可透過 Azure 入口網站 (使用者易記介面) 和 ARM Rest API 直接管理和提交 Apache Flink® 作業。

這項功能可讓使用者有效率地控制及監視其 Apache Flink 作業,而不需要深入的叢集層級知識。

福利

  • 簡化的作業管理:在 Azure 入口網站 中原生整合 Apache Flink 時,使用者不再需要大量瞭解 Flink 叢集來提交、管理和監視作業。

  • 用戶易記 REST API:AKS 上的 HDInsight 提供使用者易記的 ARM Rest API 來提交和管理 Flink 作業。 使用者可以使用這些 Rest API,從任何 Azure 服務提交 Flink 作業。

  • 毫不費力的作業更新和狀態管理:原生 Azure 入口網站 整合提供更新作業並還原至其最後儲存狀態(儲存點)的無麻煩體驗。 這項功能可確保整個作業生命週期的持續性和數據完整性。

  • 使用 Azure 管線將 Flink 作業自動化:在 AKS 上使用 HDInsight,Flink 使用者可以存取方便使用的 ARM Rest API,您可以將 Flink 作業順暢地整合到 Azure Pipeline 中。 無論您是啟動新作業、更新執行中作業或執行各種作業作業,此簡化的方法都不需要手動步驟。 它可讓您有效率地管理 Flink 叢集。

必要條件

從入口網站或 Rest API 提交和管理作業之前,有一些必要條件。

  • 在叢集的主要儲存體帳戶中建立目錄,以上傳作業 jar。

  • 如果使用者想要取得儲存點,請在儲存體帳戶中建立作業儲存點的目錄。

    顯示目錄結構的螢幕快照。

主要功能和作業

  • 新的作業提交:使用者可以毫不費力地提交新的 Flink,而不需要複雜的設定或外部工具。

  • 使用儲存點停止和啟動工作:用戶可以從先前的狀態正常停止和啟動其 Flink 作業(Savepoint)。 儲存點可確保保留作業進度,以便順暢地繼續作業。

  • 作業更新:在更新記憶體帳戶上的 jar 之後,使用者可以更新執行中的作業。 此更新會自動取得儲存點,並使用新的 jar 啟動作業。

  • 無狀態更新:透過無狀態更新簡化作業的全新重新啟動。 這項功能可讓使用者使用更新的工作 jar 起始全新重新啟動。

  • 儲存點管理:在任何指定的時刻,使用者可以為其執行中的作業建立儲存點。 您可以列出這些儲存點,並在需要時從特定檢查點重新啟動作業。

  • 取消:這會永久取消作業。

  • 刪除:刪除作業記錄。

在 AKS 上管理 HDInsight 中作業的選項

AKS 上的 HDInsight 提供管理 Flink 作業的方式。

來自 Azure 入口網站的作業管理

若要從入口網站執行 Flink 作業,請移至:

入口網站 --> AKS 叢集集區上的 HDInsight -- Flink 叢集 -->> 設定 --> Flink 作業

顯示如何執行 「flink」 作業的螢幕快照。

  • 新增作業: 若要提交新作業,請將作業 jar 上傳至記憶體帳戶,並建立儲存點目錄。 使用必要的組態完成範本,然後提交作業。

    顯示如何建立新作業的螢幕快照。

    屬性詳細資料:

    屬性 Description 預設值 必要
    作業名稱 作業的唯一名稱。 這會顯示在入口網站上。 作業名稱應為小寫字母。 Yes
    Jar 路徑 工作 jar 的儲存體路徑。 使用者應在叢集儲存體中建立目錄,並上傳作業 jar。 Yes
    Entry 類別 從中開始執行作業之作業的項目類別。 Yes
    Args 作業主要程式的引數。 使用空格分隔所有引數。 No
    平行處理原則 作業 Flink Parallelism。 2 Yes
    savepoint.directory 作業的儲存點目錄。 建議使用者為記憶體帳戶中的工作儲存點建立新的目錄。 abfs://<container>@<account>/<deployment-ID>/savepoints No

    啟動作業之後,入口網站上的工作狀態為 [正在執行]。

  • 停止: 停止作業不需要任何參數,用戶可以選取動作來停止作業。

    顯示使用者如何停止作業的螢幕快照。

    一旦停止作業,入口網站上的工作狀態會是 STOPPED

  • 開始: 此動作會從儲存點啟動作業。 若要啟動作業,請選取已停止的工作並加以啟動。

    顯示用戶啟動作業方式的螢幕快照。

    使用必要的選項填入流程範本並加以啟動。 用戶必須選取使用者想要啟動作業的儲存點。 根據預設,它會採用最後一個成功的儲存點。

    螢幕快照顯示填滿流程範本的方式。

    屬性詳細資料

    屬性 Description 預設值 必要
    Args 作業主要程式的引數。 所有自變數都應該以空格分隔。 No
    上次儲存點 上次成功儲存點的儲存點在停止作業之前。 如果未選取儲存點,則預設會使用此專案。 不可編輯
    儲存點名稱 用戶可以列出作業可用的儲存點,然後選取一個來啟動作業。 No

    作業啟動之後,入口網站上的工作狀態將會是 [正在執行]。

  • 更新: 更新有助於使用更新的作業程序代碼重新啟動作業。 用戶必須在記憶體位置更新最新的作業 jar,並從入口網站更新作業。 此更新會停止儲存點的工作,並使用最新的 jar 再次啟動。

    螢幕快照顯示如何使用更新的作業程序代碼重新啟動作業。

    更新作業的範本。

    顯示更新作業範本的螢幕快照。

    更新作業之後,入口網站上的工作狀態為「執行中」。

  • 無狀態更新: 此作業就像更新,但它牽涉到使用最新程序代碼重新重新啟動作業。

    顯示使用最新程序代碼重新重新啟動作業的螢幕快照。

    更新作業的範本。

    此螢幕快照顯示用於更新無狀態作業的範本。

    屬性詳細資料

    屬性 Description 預設值 必要
    Args 作業主要程式的引數。 使用空格分隔所有自變數。 No

    更新作業之後,入口網站上的工作狀態為 [正在執行]。

  • 儲存點: 取得 Flink 作業的儲存點。

    顯示 Flink 作業儲存點的螢幕快照。

    儲存點是耗時的程式,需要一些時間。 您可以看到作業動作狀態為進行中。

    顯示作業動作狀態的螢幕快照。

  • 取消: 此作業可協助使用者終止作業。

    顯示使用者如何終止作業的螢幕快照。

  • 刪除: 從入口網站刪除作業數據。

    螢幕快照顯示使用者如何從入口網站刪除作業數據。

  • 檢視作業詳細數據: 若要檢視作業詳細數據使用者可以按兩下作業名稱,它會提供作業的詳細數據和最後一個動作結果。

    顯示如何檢視作業詳細數據的螢幕快照。

    對於任何失敗的動作,此作業 json 會提供詳細的例外狀況和失敗原因。

使用 Rest API 進行作業管理

AKS 上的 HDInsight 支援使用者易記的 ARM Rest API 來提交作業和管理作業。 使用此 Flink REST API,您可以順暢地將 Flink 作業作業整合到 Azure Pipeline 中。 無論您是啟動新的作業、更新執行中的作業或執行各種作業作業,此簡化的方法都不需要手動步驟,並讓您有效率地管理 Flink 叢集。

Rest API 的基底 URL 格式

請參閱下列 REST API 的 URL,用戶必須先取代訂用帳戶、資源群組、叢集集集區、叢集名稱和 HDInsight on AKS API 版本,再使用它。 https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}

使用此 REST API,使用者可以起始新的作業、停止作業、啟動作業、建立儲存點、取消作業和刪除作業。 目前的 API_VERSION 是 2023-06-01-preview。

Rest API 驗證

若要驗證 Flink ARM Rest API 使用者,必須取得 ARM 資源的持有人令牌或存取令牌。 若要使用服務主體來驗證 Azure ARM (Azure Resource Manager) REST API,您可以遵循下列一般步驟:

  • 建立服務主體。

    az ad sp create-for-rbac --name <your-SP-name>

  • 為叢集授與 flink SP的擁有者許可權。

  • 使用服務主體登入。

    az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>

  • 取得存取令牌。

    $token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json

    $tok = $token.accesstoken

    用戶可以在顯示的 URL 中使用令牌。

    $data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }

使用受控識別進行驗證: 用戶可以利用支援受控識別的資源來呼叫作業 REST API。 如需詳細資訊,請參閱 受控識別 檔。

API 和參數清單

  • 新增作業: Rest API 以將新作業提交至 Flink。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文:

    { 
        "properties": { 
            "jobType": "FlinkJob", 
            "jobName": "<JOB_NAME>", 
            "action": "NEW", 
            "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
            "jarName": "<JOB_JAR_NAME>", 
            "entryClass": "<JOB_ENTRY_CLASS>", 
            “args”: ”<JOB_JVM_ARGUMENT>”
            "flinkConfiguration": { 
                "parallelism": "<JOB_PARALLELISM>", 
                "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" 
            } 
         } 
    }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 作業的唯一名稱。 這會顯示在入口網站上。 作業名稱應為小寫字母。 Yes
    action 它指出作業上的執行作業類型。 針對新作業啟動,應一律為「新」。 Yes
    jobJarDirectory 作業 jar 目錄的儲存體路徑。 使用者應在叢集儲存體中建立目錄,並上傳作業 jar。 Yes
    jarName 作業 jar 的名稱。 Yes
    entryClass 從中開始執行作業之作業的項目類別。 Yes
    args 作業主要程式的引數。 使用空格分隔引數。 No
    平行處理原則 作業 Flink Parallelism。 2 Yes
    savepoint.directory 作業的儲存點目錄。 建議使用者為記憶體帳戶中的工作儲存點建立新的目錄。 abfs://<container>@<account>/<deployment-ID>/savepoints No

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 停止作業: 停止目前執行作業的 Rest API。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

       {
          "properties": {
            "jobType": "FlinkJob",
            "jobName": "<JOB_NAME>",
            "action": "STOP"
          }
        }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 作業名稱,用於啟動作業 Yes
    action 它應該是 “STOP” Yes

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 啟動作業: Rest API 以啟動STOPPED作業。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

       {
          "properties": {
             "jobType": "FlinkJob",
             "jobName": "<JOB_NAME>",
             "action": "START",
             "savePointName": "<SAVEPOINT_NAME>"
          }
        }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該是 “START” Yes
    savePointName 儲存點名稱以啟動作業。 這是選擇性屬性,根據預設,啟動作業會取得最後一個成功的儲存點。 No

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 更新作業: 用於更新目前執行中作業的 Rest API。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”,
              "savePointName": "<SAVEPOINT_NAME>"
          }
      }
    
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該一律為「更新」,以供新作業啟動使用。 Yes
    args 作業 JVM 自變數 No
    savePointName 儲存點名稱以啟動作業。 這是選擇性屬性,根據預設,啟動作業會採用最後一個成功的儲存點。 No

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 無狀態更新作業: 無狀態更新的 Rest API。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "STATELESS_UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”
          }
      }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該是「STATELESS_UPDATE」,一律用於新的作業啟動。 Yes
    args 作業 JVM 自變數 No

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 儲存點: Rest API 以觸發作業的儲存點。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "SAVEPOINT"
          }
      }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該是 「SAVEPOINT」 一律用於新的作業啟動。 Yes

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 列出儲存點: Rest API,從 savepoint 目錄列出所有儲存點。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "LIST_SAVEPOINT"
          }
      }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱 Yes
    action 它應該是「LIST_SAVEPOINT” Yes

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 取消: 待用 API 取消作業。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "CANCEL"
          }
      }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 它應該是 FlinkJob Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該是 CANCEL。 Yes

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 刪除: 待用 API 刪除作業。

    選項
    方法 POST
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    要求本文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "DELETE"
          }
      }
    

    JSON 主體的屬性詳細資料:

    屬性 Description 預設值 必要
    jobType 作業類型。 應該是 “FlinkJob” Yes
    jobName 用於啟動作業的作業名稱。 Yes
    action 它應該是 DELETE。 Yes

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 列出作業: Rest API 以列出目前動作的所有作業和狀態。

    選項
    方法 GET
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
    頁首 Authorization = “Bearer $token”

    輸出:

    { 
      "value": [ 
          { 
              "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", 
              "properties": { 
                  "jobType": "FlinkJob", 
                  "jobName": "job1", 
                  "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
                  "jarName": "<JOB_JAR_NAME>", 
                  "action": "STOP", 
                  "entryClass": "<JOB_ENTRY_CLASS>", 
                  "flinkConfiguration": { 
                      "parallelism": "2", 
                      "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" 
                  }, 
                  "jobId": "20e9e907eb360b1c69510507f88cdb7b", 
                  "status": "STOPPED", 
                  "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", 
                  "actionResult": "SUCCESS", 
                  "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" 
          } 
       }
      ]
    }
    

注意

當任何動作正在進行中時,actionResult 會以 'IN_PROGRESS' 值表示它成功完成時,它會顯示 'SUCCESS',如果失敗,則會是 'FAILED'。

參考