你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
HDInsight on AKS 群集中的 Apache Flink® 作业管理
注意
我们将于 2025 年 1 月 31 日停用 Azure HDInsight on AKS。 在 2025 年 1 月 31 日之前,你需要将工作负荷迁移到 Microsoft Fabric 或同等的 Azure 产品,以避免工作负荷突然终止。 订阅上的剩余群集会被停止并从主机中移除。
在停用日期之前,仅提供基本支持。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并关注我们以获取 Azure HDInsight Community 的更多更新。
HDInsight on AKS 提供了通过 Azure 门户(用户友好界面)和 ARM Rest API 直接管理和提交 Apache Flink 作业的功能。
此功能使用户能够高效控制和监视其 Apache Flink 作业,而无需深入的群集级别知识。
好处
简化的作业管理:通过在 Azure 门户中对 Apache Flink 的本机集成,用户不再需要对 Flink 集群有深入了解,即可提交、管理和监控作业。
用户友好 REST API:HDInsight on AKS 提供用户友好的 ARM Rest API 来提交和管理 Flink 作业。 用户可以使用这些 Rest API 从任何 Azure 服务提交 Flink 作业。
轻松进行作业更新和状态管理:本机 Azure 门户集成提供轻松体验,同于更新作业并将其还原到其上次保存的状态(保存点)。 此功能可确保整个作业生命周期的连续性和数据完整性。
使用 Azure Pipeline 自动执行 Flink 作业:借助 HDInsight on AKS,Flink 用户有权访问用户友好的 ARM Rest API,可以将 Flink 作业操作无缝集成到 Azure Pipeline 中。 无论是启动新作业、更新正在运行的作业还是执行各种作业操作,这种简化的方法都可消除手动步骤。 它使你能够有效地管理 Flink 群集。
先决条件
在从门户或 Rest API 提交和管理作业之前,需要满足一些先决条件。
主要功能和操作
新作业提交:用户可以轻松提交新的 Flink,而无需复杂的配置或外部工具。
使用保存点停止和启动作业:用户可以从容地从之前的状态(保存点)停止和启动其 Flink 作业。 保存点可确保保留作业进度,从而实现无缝恢复。
作业更新:更新存储帐户上的 jar 后,用户可以更新正在运行的作业。 此更新会自动获取保存点,并使用新的 jar 启动作业。
无状态更新:通过无状态更新简化作业的全新重启。 此功能允许用户使用更新的作业 jar 启动干净重启。
保存点管理:在任何给定时刻,用户都可以为其正在运行的作业创建保存点。 可以列出这些保存点,并将其根据需要用于从特定检查点重启作业。
取消:这会永久取消作业。
删除:删除作业历史记录记录。
用于在 HDInsight on AKS 中管理作业的选项
HDInsight on AKS 提供了管理 Flink 作业的方法。
从 Azure 门户进行作业管理
若要从门户运行 Flink 作业,请转到:
门户 --> HDInsight on AKS 群集池 --> Flink 群集 --> 设置 --> Flink 作业
新作业: 若要提交新作业,请将作业 jar 上传到存储帐户并创建保存点目录。 使用必要的配置完成模板,然后提交作业。
属性详细信息:
properties Description 默认值 必需 作业名称 作业的唯一名称。 这显示在门户上。 作业名称应为小写字母。 是 Jar 路径 作业 jar 的存储路径。 用户应在群集存储中创建目录并上传作业 jar。 是 条目类 从中开始作业执行的作业的条目类。 是 参数 作业主程序的参数。 请用空格分隔所有参数。 否 度 作业 Flink 并行度。 2 是 savepoint.directory 作业的保存点目录。 建议用户为存储帐户中的作业保存点创建新目录。 abfs://<container>@<account>/<deployment-ID>/savepoints
否 启动作业后,门户上的作业状态为 RUNNING。
停止:停止作业不需要任何参数,用户可以通过选择该操作来停止作业。
作业停止后,门户上的作业状态为 STOPPED。
开始: 此操作从保存点启动作业。 若要启动作业,请选择已停止的作业并启动它。
使用所需的选项填充流模板并启动它。 用户需要选择要从中启动作业的保存点。 默认情况下,它采用最后一个成功的保存点。
属性详细信息:
properties Description 默认值 必需 参数 作业主程序的参数。 应使用空格分隔所有参数。 否 上一保存点 停止作业之前的最后一个成功保存点。 如果未选择保存点,则默认使用此选项。 不可编辑 保存点名称 用户可以列出作业的可用保存点,并选择一个用于启动作业的保存点。 否 启动作业后,门户上的作业状态将为 RUNNING。
更新: 更新有助于使用更新的作业代码重启作业。 用户需要在存储位置更新最新的作业 jar,并从门户更新作业。 此更新使用 savepoint 停止作业,并使用最新的 jar 再次启动。
用于更新作业的模板。
更新作业后,门户上的作业状态为“RUNNING”。
无状态更新: 此作业类似于更新,但它涉及使用最新代码重新重启作业。
用于更新作业的模板。
属性详细信息:
properties Description 默认值 必需 参数 作业主程序的参数。 请用空格分隔所有参数。 否 启动作业后,门户上的作业状态为“RUNNING”。
保存点: 获取 Flink 作业的保存点。
保存点是耗时的过程,需要一些时间。 可以看到作业操作状态为正在进行。
取消: 此作业可帮助用户终止作业。
删除: 从门户删除作业数据。
查看作业详细信息: 若要查看作业详细信息,用户可以单击作业名称,它会提供有关作业的详细信息以及上一操作结果。
对于任何失败的操作,此作业 json 提供详细的异常和失败原因。
使用 Rest API 进行作业管理
HDInsight on AKS 支持使用用户友好的 ARM Rest API 来提交和管理作业。 使用此 Flink REST API,可以将 Flink 作业操作无缝集成到 Azure Pipeline 中。 无论是启动新作业、更新正在运行的作业还是执行各种作业操作,这种简化的方法都消除了手动步骤,并使你能够高效地管理 Flink 群集。
Rest API 的基 URL 格式
请参阅以下 URL 以获取 REST API,用户在使用之前,需要先替换订阅、资源组、群集池、群集名称和 HDInsight on AKS API 版本。
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
使用此 REST API,用户可以启动新作业、停止作业、启动作业、创建保存点、取消作业和删除作业。 当前 API_VERSION 为 2023-06-01-preview。
Rest API 身份验证
若要对 Flink ARM Rest API 用户进行身份验证,需要获取 ARM 资源的持有者令牌或访问令牌。 若要使用服务主体对 Azure ARM(Azure 资源管理器)REST API 进行身份验证,可以遵循以下常规步骤:
创建服务主体。
az ad sp create-for-rbac --name <your-SP-name>
为
flink
群集授予 SP 的所有者权限。使用服务主体登录。
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
获取访问令牌。
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
用户可以在显示的 URL 中使用令牌。
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
使用托管标识进行身份验证: 用户可以利用支持托管标识的资源来调用作业 REST API。 有关详细信息,请参阅托管标识文档。
API 和参数列表
新作业: 用于将新作业提交到 Flink 的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 作业的唯一名称。 这显示在门户上。 作业名称应为小写字母。 是 action 它指示作业的操作类型。 对于新作业启动,它应始终为“NEW”。 是 jobJarDirectory 作业 jar 目录的存储路径。 用户应在群集存储中创建目录并上传作业 jar。 是 jarName 作业 jar 的名称。 是 entryClass 从中开始作业执行的作业的条目类。 是 args 作业主程序的参数。 用空格分隔参数。 否 度 作业 Flink 并行度。 2 是 savepoint.directory 作业的保存点目录。 建议用户为存储帐户中的作业保存点创建新目录。 abfs://<container>@<account>/<deployment-ID>/savepoints
否 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
停止作业:用于停止当前正在运行的作业的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 作业名称,用于启动作业 是 action 它应为“STOP” 是 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
启动作业: 用于启动 STOPPED 作业的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称。 是 action 它应为“START” 是 savePointName 保存点名称以启动作业。 它是可选的属性,默认情况下启动操作会采用上一成功的保存点。 否 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
更新作业: 用于更新当前正在运行的作业的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称。 是 action 对于新作业启动,它应始终为“更新”。 是 args 作业 JVM 参数 否 savePointName 保存点名称以启动作业。 它是可选的属性,默认情况下启动操作将采用上一成功的保存点。 否 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
无状态更新作业: 用于无状态更新的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称。 是 action 对于新作业启动,它应始终为“STATELESS_UPDATE”。 是 args 作业 JVM 参数 否 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
保存点:用于触发作业的保存点的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称。 是 action 对于新作业启动,它应始终为“SAVEPOINT”。 是 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
列出保存点: 用于列出 savepoint 目录中的所有保存点的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称 是 action 它应为“LIST_SAVEPOINT” 是 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
取消: 用于取消作业的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为: FlinkJob
是 jobName 用于启动作业的作业名称。 是 action 它应为 CANCEL。 是 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
删除: 用于删除作业的 Rest API。
选项 值 方法 POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
JSON 正文的属性详细信息:
properties Description 默认值 必需 jobType 作业类型。 它应为“FlinkJob” 是 jobName 用于启动作业的作业名称。 是 action 它应为 DELETE。 是 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
列出作业: 用于列出当前操作的所有作业和状态的 Rest API。
选项 值 Method GET URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
Header 授权 = “Bearer $token” 输出:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
注意
当任何操作正在进行时,actionResult 将指示其值为“IN_PROGRESS”,如果成功完成,它将显示“SUCCESS”,如果失败,它将为“FAILED”。
参考
- Apache Flink 作业计划
- Apache、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation (ASF) 的商标。