AKS 群集上的 HDInsight 中的 Apache Flink® 作业管理
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 进一步了解 ,请查看此公告。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于 beta 阶段、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求并提供详细信息,关注我们以获取 Azure HDInsight 社区 的更多更新。
AKS 上的 HDInsight 提供一项功能,用于通过 Azure 门户(用户友好界面)和 ARM Rest API 直接管理和提交 Apache Flink® 作业。
此功能使用户能够高效控制和监视其 Apache Flink 作业,而无需深入的群集级别知识。
好处
简化的作业管理:通过 Azure 门户中的 Apache Flink 原生集成,用户无需具备深厚的 Flink 群集知识即可提交、管理和监控作业。
User-Friendly REST API:AKS 上的 HDInsight 提供用户友好的 ARM Rest API 来提交和管理 Flink 作业。 用户可以使用这些 Rest API 从任何 Azure 服务提交 Flink 作业。
轻松的作业更新和状态管理:原生 Azure 门户集成提供了更新作业并无缝恢复到其上次保存状态(保存点)的便利体验。 此功能可确保整个作业生命周期的连续性和数据完整性。
使用 Azure Pipeline自动执行 Flink 作业:在 AKS 上使用 HDInsight,Flink 用户可以访问用户友好的 ARM REST API,将 Flink 作业无缝集成到 Azure Pipeline 中。 无论是启动新作业、更新正在运行的作业还是执行各种作业作,这种简化的方法都消除了手动步骤。 它使你能够有效地管理 Flink 群集。
先决条件
在从门户或 Rest API 提交和管理作业之前,需要满足一些先决条件。
关键功能和操作
新作业提交:用户可以毫不费力地提交新的 Flink 作业,而无需复杂的配置或外部工具。
停止和启动具有保存点的作业:用户可以正常停止和启动其 Flink 作业(保存点)。 保存点可确保保留作业进度,从而实现无缝恢复。
作业更新:更新存储帐户上的 jar 后,用户可以更新正在运行的作业。 此更新会自动获取保存点,并使用新的 jar 启动作业。
无状态更新:通过无状态更新简化作业的重新启动。 此功能允许用户使用更新的任务包进行清洁重启。
Savepoint 管理:在任何给定时刻,用户可以为其正在运行的作业创建保存点。 可以列出这些保存点,并用于根据需要从特定检查点重启作业。
取消:这会永久取消任务。
删除:删除作业历史记录。
用于在 AKS 上的 HDInsight 中管理作业的选项
AKS 上的 HDInsight 提供了管理 Flink 作业的方法。
从 Azure 门户 作业管理
若要从门户运行 Flink 作业,请转到:
门户 -- AKS 群集池上的> HDInsight --> Flink 群集 --> 设置 --> Flink 作业
新作业: 若要提交新作业,请将作业 jar 上传到存储帐户并创建保存点目录。 使用必要的配置完成模板,然后提交作业。
属性详细信息:
财产 描述 默认值 强制的 作业名称 任务的唯一名称。 这会显示在门户网站上。 工作名称应为小写字母。 是的 Jar 文件路径 作业 jar 的存储路径。 用户应该在集群存储中创建一个目录,并上传作业的 jar 文件。 是的 Entry 类 用于启动作业执行的作业入口类。 是的 Args 用于作业主程序的参数。 使用空格分隔所有参数。 不 排比 作业 Flink 并行度。 2 是的 savepoint.directory 作业的保存点目录。 建议用户在存储帐户中为作业保存点创建一个新目录。 abfs://<container>@<account>/<deployment-ID>/savepoints
不 启动作业后,门户上的作业状态为 RUNNING。
停止: 停止作业不需要任何参数,用户可以通过选择动作来停止作业。
作业停止后,门户上的作业状态为 STOPPED。
开始: 此操作从保存点启动任务。 若要启动作业,请选择已停止的作业并启动它。
使用所需的选项填充流模板并启动它。 用户需要选择一个保存点,以便用户可以从该保存点启动作业。 默认情况下,它采用最后一个成功的保存点。
属性详细信息:
财产 描述 默认值 强制性的 Args 作业主程序的参数说明。 所有参数都应该用空格分开。 不 上次保存点 在停止作业之前进行的上次成功保存点。 如果未选择保存点,则默认使用此选项。 不可编辑 保存点名称 用户可以列出作业的可用保存点,并选择一个用于启动作业的保存点。 不 启动作业后,门户上的作业状态将是运行中。
更新: 更新有助于使用更新的作业代码重启作业。 用户需要在存储位置更新最新的作业 jar,并从门户更新作业。 此更新使用 savepoint 停止作业,并使用最新的 jar 再次启动。
用于更新作业的模板。
更新作业后,门户上的作业状态为“正在运行”。
无状态更新: 此作业类似于更新,但它涉及到使用最新代码重新重启作业。
用于更新作业的模板。
属性详细信息:
财产 描述 默认值 强制的 Args 作业主程序的参数。 用空格分隔所有参数。 不 更新作业后,门户上的作业状态为“正在运行”。
保存点: 获取 Flink 作业的保存点。
保存点是耗时的过程,需要一些时间。 可以看到作业操作状态正在进行。
取消: 此作业可帮助用户终止作业。
删除: 删除门户中的作业数据。
查看作业详细信息: 若要查看作业详细信息,用户可以单击作业名称,它提供有关作业的详细信息和最后一个作结果。
对于任何失败的操作,此任务的JSON提供了详细的异常信息和失败原因。
使用 Rest API 的作业管理
AKS 上的 HDInsight 支持用户友好的 ARM Rest API 来提交作业和管理作业。 使用此 Flink REST API,可以将 Flink 作业作无缝集成到 Azure Pipeline 中。 无论是启动新作业、更新正在运行的作业还是执行各种作业作,这种简化的方法都消除了手动步骤,并使您能够高效地管理 Flink 群集。
Rest API 的基 URL 格式
请参阅以下 REST API URL,用户在使用此 API 之前,需要先替换 AKS API 版本的订阅、资源组、群集池、群集名称和 HDInsight。
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
使用此 REST API,用户可以启动新作业、停止作业、启动作业、创建保存点、取消作业和删除作业。 当前API_VERSION为 2023-06-01-preview。
Rest API 身份验证
若要对 Flink ARM Rest API 用户进行身份验证,需要获取 ARM 资源的持有者令牌或访问令牌。 若要使用服务主体对 Azure ARM(Azure 资源管理器)REST API 进行身份验证,可以遵循以下常规步骤:
创建服务主体。
az ad sp create-for-rbac --name <your-SP-name>
为 SP 授予
flink
群集的所有者权限。使用服务主体登录。
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
获取访问令牌。
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
用户可以在显示的 URL 中使用令牌。
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
使用托管标识进行身份验证: 用户可以利用支持托管标识的资源来调用作业 REST API。 有关详细信息,请参阅 托管标识 文档。
API 和参数列表
新作业: Rest API 将新作业提交到 Flink。
选择 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
JSON 对象主体的 属性详细信息:
财产 描述 默认值 强制的 职位类型 作业类型。 它应该是“FlinkJob” 是的 作业名称 任务的唯一名称。 这会显示在门户网站上。 作业名称应使用小写字母。 是的 行动 它指示任务中的操作类型。 对于新作业启动,应始终使用“新建”。 是的 jobJarDirectory 作业 jar 目录的存储路径。 用户应在群集存储中创建目录并上传任务 jar 文件。 是的 jarName 作业任务包的名称。 是的 entryClass 用于启动作业执行的作业入口类。 是的 args 作业主程序的参数。 用空格分隔参数。 不 平行性 Flink“任务并行度” 2 是的 savepoint.directory 作业的保存点目录。 建议用户为存储帐户中的作业保存点创建新目录。 abfs://<container>@<account>/<deployment-ID>/savepoints
不 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
停止作业:用于停止当前正在运行的作业的 Rest API。
选择 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
以下是 JSON 正文的属性详细信息:
财产 描述 默认值 强制的 职位类型 作业类型。 应该是“FlinkJob” 是的 作业名称 作业名称,用于启动作业 是的 行动 它应为“STOP” 是的 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
启动作业: 使用 REST API 启动已停止的作业。
选项 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
JSON 正文的 属性详细信息:
财产 描述 默认值 强制的 工作类型 作业类型。 它应该是“FlinkJob” 是的 作业名称 用于启动作业的作业名称。 是的 行动 它应为“START” 是的 保存点名称 保存点名称以启动作业。 这是一个可选属性,默认情况下,启动操作会以最后一次成功的保存点为基础。 不 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
更新作业:用于更新当前正在运行的作业的 Rest API。
选择 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
JSON 正文的 属性详细信息:
财产 描述 默认值 强制的 职位类型 作业类型。 应该是“FlinkJob” 是的 工作名称 用于启动作业的作业名称。 是的 行动 对于新作业启动,应该始终使用“UPDATE”。 是的 args 作业 JVM 参数 不 保存点名称 保存点名称以启动作业。 该属性是可选的,默认情况下,启动操作将采用最后一次成功的保存点。 不 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
无状态更新作业: 提供用于无状态更新的 Rest API。
选择 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
JSON 正文的 属性详细信息:
财产 描述 默认值 必需的 职位类型 作业类型。 它应该是 “FlinkJob”。 是的 任务名称 用于启动作业的作业名称。 是的 行动 启动新作业时,始终应为“STATELESS_UPDATE”。 是的 args 作业 JVM 参数 不 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
保存点: Rest API 来触发作业的保存点。
选项 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
JSON 正文的 属性详细信息:
财产 描述 默认值 强制性的 工作类型 作业类型。 它应该是“FlinkJob” 是的 职位名称 用于启动作业的作业名称。 是的 行动 对于新任务启动,它应始终为“SAVEPOINT”。 是的 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
列表保存点: Rest API 列出 savepoint 目录中的所有保存点。
选项 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
JSON 正文的 属性详细信息:
财产 描述 默认值 强制的 工作类型 作业类型。 它应该是“FlinkJob” 是的 作业名称 用于启动作业的作业名称 是的 行动 它应为“LIST_SAVEPOINT” 是的 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
取消: REST API 用于取消作业。
选择 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
JSON 正文的 属性详细信息:
财产 描述 默认值 强制的 职位类型 作业类型。 它应该是 FlinkJob
是的 作业名称 用于启动作业的作业名称。 是的 行动 它应为取消。 是的 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
删除: Rest API 删除作业。
选项 价值 方法 发布 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 请求正文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
JSON 数据包体的 属性详细信息:
财产 描述 默认值 强制的 职位类型 作业类型。 它应该是“FlinkJob” 是的 作业名称 用于启动作业的作业名称。 是的 行动 它应为 DELETE。 是的 示例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
列出作业: Rest API 列出所有作业及其当前状态。
选择 价值 方法 获取 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
页眉 Authorization = “Bearer $token” 输出:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
注意
当任何操作正在进行时,actionResult 将显示值“IN_PROGRESS”;成功完成时,它将显示“SUCCESS”;如果失败,则显示“FAILED”。
参考
- Apache Flink 作业调度
- Apache、Apache Flink、Flink 和相关的开源项目名称是 属于 Apache 软件基金会(ASF)的商标。