HDInsight on AKS クラスターにおける Apache Flink® ジョブ管理
Note
Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。
提供終了日まで基本サポートのみ利用できます。
重要
現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティのフォローをお願いいたします。
HDInsight on AKS は、Azure portal (使いやすいインターフェイス) と ARM REST API を使って Apache Flink® ジョブを直接管理および送信するための機能が用意されています。
この機能により、ユーザーは Apache Flink ジョブを効率的に制御および監視できます。クラスター レベルの深い知識は必要ありません。
福利厚生
簡素化されたジョブ管理: Azure portal で Apache Flink がネイティブに統合され、ユーザーにジョブを送信、管理、監視するための Flink クラスターに関する広範な知識が不要になりました。
使いやすい REST API: HDInsight on AKS には、Flink ジョブを送信および管理するためのわかりやすい ARM REST API が用意されています。 ユーザーは、これらの REST API を使用して、任意の Azure サービスから Flink ジョブを送信できます。
簡単なジョブ更新と状態管理: Azure portal とのネイティブ統合により、手間のかからないジョブ更新および最後に保存した状態 (セーブポイント) への復元エクスペリエンスが実現します。 この機能により、ジョブのライフサイクル全体にわたって継続性とデータ整合性が確保されます。
Azure パイプラインを使用した Flink ジョブの自動化: HDInsight on AKS を使用することで、Flink ユーザーがわかりやすい ARM REST API にアクセスでき、Flink ジョブ操作を Azure パイプラインにシームレスに統合できます。 新しいジョブを起動する場合でも、実行中のジョブを更新する場合でも、さまざまなジョブ操作を実行する場合でも、この効率的なアプローチにより手動操作が不要になります。 これにより Flink クラスターを効率的に管理できます。
前提条件
ポータルまたは REST API からジョブを送信して管理するには、前提条件がいくつかあります。
クラスターのプライマリ ストレージ アカウントにディレクトリを作成して、ジョブ jar をアップロードします。
ユーザーがセーブポイントを取得する必要がある場合は、ジョブ セーブポイントのストレージ アカウントにディレクトリを作成します。
主要な機能と操作
新しいジョブの送信: ユーザーが簡単に新しい Flink を送信できるため、複雑な構成や外部ツールが不要になります。
セーブポイントを使用したジョブの停止と開始: ユーザーが以前の状態 (セーブポイント) から Flink ジョブを適切に停止および開始できます。 セーブポイントを使用すると、ジョブの進行状況が確実に維持され、シームレスに再開できます。
ジョブの更新: ユーザーがストレージ アカウント上の jar を更新した後、実行中のジョブを更新できます。 この更新により、セーブポイントが自動的に取得され、新しい jar でジョブが開始されます。
ステートレス更新: ジョブを最新の情報に更新する作業が、ステートレス更新によって簡略化されます。 この機能により、ユーザーは更新されたジョブ jar を使用してクリーンな再起動を開始できます。
セーブポイント管理: ユーザーがいつでも、実行中のジョブのセーブポイントを作成できます。 これらのセーブポイントを一覧表示し、必要に応じて特定のチェックポイントからジョブを再開するときに使用できます。
キャンセル: ジョブが完全に取り消されます。
削除: ジョブ履歴レコードを削除します。
HDInsight on AKS でのジョブ管理オプション
HDInsight on AKS には Flink ジョブを管理する方法が用意されています。
Azure portal からのジョブ管理
ポータルから Flink ジョブを実行するには、次の順番で操作します。
ポータル --> HDInsight on AKS クラスター プール --> Flink クラスター --> 設定 --> Flink ジョブ
新しいジョブ: 新しいジョブを送信するには、ジョブ jar をストレージ アカウントにアップロードし、セーブポイント ディレクトリを作成します。 必要な構成でテンプレートを完成させて、ジョブを送信します。
プロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ名 ジョブの一意の名前。 これはポータルに表示されます。 ジョブ名は小文字にする必要があります。 はい ジョブのパス ジョブ jar のストレージ パス。 ユーザーはクラスター ストレージにディレクトリを作成し、ジョブ jar をアップロードする必要があります。 はい Entry クラス ジョブ実行を開始するジョブのエントリ クラス。 はい 引数 ジョブのメイン プログラムの引数。 すべての引数を空白で区切ります。 いいえ 度 ジョブ Flink 並列処理。 2 はい savepoint.directory ジョブのセーブポイント ディレクトリ。 ユーザーがストレージ アカウントにジョブ セーブポイント用の新しいディレクトリを作成することをお勧めします。 abfs://<container>@<account>/<deployment-ID>/savepoints
いいえ ジョブが起動すると、ポータルのジョブの状態は実行中になります。
停止: ジョブの停止にパラメーターは必要ありませんでした。ユーザーはアクションを選択してジョブを停止できます。
ジョブが停止すると、ポータルのジョブの状態は停止済みになります。
開始: このアクションでは、ジョブはセーブポイントから開始されます。 ジョブを開始するには、停止したジョブを選択し、開始します。
フロー テンプレートに必要なオプションを入力し、開始します。 ユーザーは、ジョブを開始するセーブポイントを選択する必要があります。 既定では、最後に成功したセーブポイントが取得されます。
プロパティの詳細:
プロパティ Description 既定値 Mandatory 引数 ジョブのメイン プログラムの引数。 すべての引数を空白で区切る必要があります。 いいえ Last savepoint ジョブが停止される前に最後に成功したセーブポイント。 セーブポイントが選択されていない場合に既定で使用されます。 編集不可 Save point name ユーザーがジョブに使用できるセーブポイントを一覧表示し、その 1 つを選択してジョブを開始できます。 いいえ ジョブが開始されると、ポータルのジョブの状態は実行中になります。
更新: 更新は、更新されたジョブ コードを使用してジョブを再起動するのに役立ちます。 ユーザーは、保存場所で最新のジョブ jar を更新し、ポータルからジョブを更新する必要があります。 この更新操作により、ジョブはセーブポイントを使用して停止され、最新の jar で再開されます。
ジョブを更新するためのテンプレート。
ジョブが更新されると、ポータルのジョブの状態は "実行中" になります。
ステートレス更新: このジョブは更新に似ていますが、最新のコードを使用してジョブを再起動する必要があります。
ジョブを更新するためのテンプレート。
プロパティの詳細:
プロパティ Description 既定値 Mandatory 引数 ジョブのメイン プログラムの引数。 すべての引数を空白で区切ります。 いいえ ジョブが更新されると、ポータルのジョブの状態は実行中になります。
セーブポイント: Flink ジョブのセーブポイントを取得します。
セーブポイントは時間のかかるプロセスです。 ジョブ アクションの状態は進行中になります。
キャンセル: このジョブは、ユーザーがジョブを終了するのに役立ちます。
削除: ポータルからジョブ データを削除します。
ジョブの詳細の表示: ジョブの詳細を表示するには、ユーザーがジョブ名をクリックします。これにより、ジョブと最後のアクションの結果の詳細が表示されます。
失敗したアクションの場合、このジョブ JSON により、失敗の例外と理由が詳しく表示されます。
REST API を使用したジョブ管理
HDInsight on AKS では、ジョブを送信して管理するためのわかりやすい ARM Rest API がサポートされています。 この Flink REST API を使用すると、Flink ジョブ操作を Azure パイプラインにシームレスに統合できます。 新しいジョブを起動する場合でも、実行中のジョブを更新する場合でも、さまざまなジョブ操作を実行する場合でも、この効率的なアプローチにより手動操作が不要になり、Flink クラスターを効率的に管理できます。
REST API のベース URL 形式
次の REST API の URL を参照してください。ユーザーがこれを使用するには、まずサブスクリプション、リソース グループ、クラスター プール、クラスター名、HDInsight on AKS の API バージョンを置き換える必要があります。
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
この REST API を使用すると、ユーザーは新しいジョブの開始、ジョブの停止、ジョブの開始、セーブポイントの作成、ジョブの取り消し、ジョブの削除を行うことができます。 現在の API_VERSION は 2023-06-01-preview です。
REST API 認証
Flink ARM REST API ユーザーを認証するには、ARM リソースのベアラー トークンまたはアクセス トークンを取得する必要があります。 サービス プリンシパルを使用して Azure ARM (Azure Resource Manager) REST API を認証するには、次の一般的な手順に従います。
サービス プリンシパルを作成します。
az ad sp create-for-rbac --name <your-SP-name>
flink
クラスターの SP に所有者アクセス許可を付与します。サービス プリンシパルを使用してログインします。
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
アクセス トークンを取得します。
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
ユーザーは、表示された URL でトークンを使用できます。
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
マネージド ID を使用した認証: ユーザーは、マネージド ID をサポートするリソースを使用して、ジョブ REST API を呼び出すことができます。 詳細については、マネージド ID に関するドキュメントを参照してください。
API とパラメーターの一覧
新しいジョブ: Flink に新しいジョブを送信する REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブの一意の名前。 これはポータルに表示されます。 ジョブ名は小文字にする必要があります。 はい action ジョブの操作の種類を示します。 新しいジョブ起動の場合は必ず "NEW" になります。 はい jobJarDirectory ジョブ jar ディレクトリのストレージ パス。 ユーザーはクラスター ストレージにディレクトリを作成し、ジョブ jar をアップロードする必要があります。 はい jarName ジョブ jar の名前。 はい entryClass ジョブ実行を開始するジョブのエントリ クラス。 はい args ジョブのメイン プログラムの引数。 引数を空白で区切ります。 いいえ 度 ジョブ Flink 並列処理。 2 はい savepoint.directory ジョブのセーブポイント ディレクトリ。 ユーザーがストレージ アカウントにジョブ セーブポイント用の新しいディレクトリを作成することをお勧めします。 abfs://<container>@<account>/<deployment-ID>/savepoints
いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの停止: 現在実行中のジョブを停止するための REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます はい action これは "STOP" になるはずです はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの開始: 停止したジョブを開始する REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます。 はい action これは "START" になるはずです はい savePointName ポイント名を保存してジョブを開始します。 これは省略可能なプロパティです。既定では、開始操作では最後に成功したセーブポイントが使用されます。 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの更新: 現在実行中のジョブを更新するための REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます。 はい action 新しいジョブ起動の場合は必ず "UPDATE" になります。 はい args ジョブ JVM 引数 いいえ savePointName ポイント名を保存してジョブを開始します。 これは省略可能なプロパティです。既定では、開始操作では最後に成功したセーブポイントが使用されます。 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブのステートレス更新: ステートレス更新用の REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます。 はい action 新しいジョブ起動の場合は必ず "STATELESS_UPDATE" になります。 はい args ジョブ JVM 引数 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
セーブポイント: ジョブのセーブポイントをトリガーする REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます。 はい action 新しいジョブ起動の場合は必ず "SAVEPOINT" になります。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
セーブポイントの一覧表示: セーブポイント ディレクトリのすべてのセーブポイントを一覧表示する REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブの起動に使用されるジョブ名 はい action これは "LIST_SAVEPOINT" になるはずです はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
キャンセル: ジョブを取り消す REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは FlinkJob
になるはずですはい jobName ジョブ名。ジョブの起動に使用されます。 はい action これは CANCEL になるはずです。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
削除: ジョブを削除する REST API。
オプション 値 方法 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 要求本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
JSON 本文のプロパティの詳細:
プロパティ Description 既定値 Mandatory ジョブ タイプ ジョブの種類。 これは "FlinkJob" になるはずです はい jobName ジョブ名。ジョブの起動に使用されます。 はい action これは DELETE になるはずです。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの一覧表示: すべてのジョブと現在のアクションの状態を一覧表示する REST API。
オプション 値 方法 GET URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
ヘッダー Authorization = "Bearer $token" 出力:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
Note
アクションが進行中の場合、actionResult は値 "IN_PROGRESS" によってそのことを示します。正常に完了した場合は "SUCCESS" が表示され、失敗した場合は "FAILED" になります。
リファレンス
- Apache Flink ジョブのスケジューリング
- Apache、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。