AKS クラスターでの HDInsight での Apache Flink® ジョブ管理
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 についてさらに知りたい場合は、このお知らせをご覧ください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを提出し、Azure HDInsight Community の更新情報をフォローしてご覧ください。
AKS 上の HDInsight には、Azure portal (ユーザー フレンドリ インターフェイス) と ARM Rest API を介して Apache Flink® ジョブを直接管理および送信する機能が用意されています。
この機能により、ユーザーはクラスター レベルの深い知識を必要とせずに、Apache Flink ジョブを効率的に制御および監視できます。
利点
簡素化されたジョブ管理: Azure portal での Apache Flink のネイティブ統合により、ユーザーはジョブの送信、管理、監視に Flink クラスターに関する広範な知識を必要としなくなりました。
User-Friendly REST API: AKS 上の HDInsight には、Flink ジョブを送信および管理するためのわかりやすい ARM Rest API が用意されています。 ユーザーは、これらの Rest API を使用して、任意の Azure サービスから Flink ジョブを送信できます。
簡単なジョブの更新と状態管理の: ネイティブの Azure portal 統合により、ジョブを更新し、最後に保存した状態 (セーブポイント) に復元するための手間のかからないエクスペリエンスが提供されます。 この機能により、ジョブのライフサイクル全体にわたって継続性とデータの整合性が確保されます。
Azure パイプラインを使用して Flink ジョブを自動化する: AKS 上の HDInsight を使用すると、Flink ユーザーはわかりやすい ARM Rest API にアクセスでき、Flink ジョブ操作を Azure Pipeline にシームレスに統合できます。 新しいジョブを起動する場合でも、実行中のジョブを更新する場合でも、さまざまなジョブ操作を実行する場合でも、この合理化されたアプローチでは手動の手順が不要になります。 Flink クラスターを効率的に管理できます。
前提 条件
ポータルまたは Rest API からジョブを送信して管理する前に、いくつかの前提条件があります。
クラスターのプライマリ ストレージ アカウントにディレクトリを作成して、ジョブ jar をアップロードします。
ユーザーがセーブポイントを取得する場合は、ジョブセーブポイントのストレージ アカウントにディレクトリを作成します。
主な機能と操作
新しいジョブの送信: ユーザーは簡単に新しい Flink を送信できるため、複雑な構成や外部ツールが不要になります。
セーブポイントでジョブを停止および開始する: ユーザーは、以前の状態 (セーブポイント) から Flink ジョブを正常に停止して開始できます。 セーブポイントを使用すると、ジョブの進行状況が確実に維持され、シームレスな再開が可能です。
ジョブの更新: ユーザーは、ストレージ アカウントの jar を更新した後、実行中のジョブを更新できます。 この更新では、セーブポイントが自動的に取得され、新しい jar でジョブが開始されます。
ステートレス更新: ジョブの新しい再起動の実行は、ステートレス更新によって簡略化されます。 この機能により、ユーザーは更新されたジョブ jar を使用してクリーンな再起動を開始できます。
セーブポイント管理: ユーザーはいつでも、実行中のジョブのセーブポイントを作成できます。 これらのセーブポイントを一覧表示し、必要に応じて特定のチェックポイントからジョブを再開するために使用できます。
キャンセル: これはジョブを完全に取り消します。
削除: ジョブ履歴レコードを削除します。
AKS 上の HDInsight でジョブを管理するためのオプション
AKS 上の HDInsight には、Flink ジョブを管理する方法が用意されています。
Azure portal からのジョブ管理
ポータルから Flink ジョブを実行するには、次の手順に進みます。
ポータル -- AKS クラスター プールでの HDInsight の> -- Flink クラスターの> --> 設定 -- Flink ジョブの>
新しいジョブ: 新しいジョブを送信するには、ジョブ jar をストレージ アカウントにアップロードし、セーブポイント ディレクトリを作成します。 必要な構成でテンプレートを完成させ、ジョブを送信します。
プロパティの詳細:
財産 説明 既定値 必須 ジョブ名 ジョブの一意の名前。 これはポータルに表示されます。 ジョブ名は小さい後者にする必要があります。 はい JARファイルのパス ジョブ ジャーのストレージパス。 ユーザーはクラスター ストレージにディレクトリを作成し、ジョブ jar をアップロードする必要があります。 はい 初級クラス ジョブの実行を開始するジョブのエントリ クラス。 はい 引数 ジョブのメイン プログラムの引数。 すべての引数をスペースで区切ります。 いいえ 並列性 ジョブ Flink 並列処理。 2 はい savepoint.directory (セーブポイントのディレクトリ) ジョブのセーブポイント ディレクトリ。 ユーザーは、ストレージ アカウントにジョブ セーブポイント用の新しいディレクトリを作成することをお勧めします。 abfs://<container>@<account>/<deployment-ID>/savepoints
いいえ ジョブが起動すると、ポータルのジョブの状態は RUNNINGです。
停止: ジョブの停止にパラメーターは必要ありませんでした。ユーザーはアクションを選択してジョブを停止できます。
ジョブが停止すると、ポータルのジョブの状態は 停止になります。
開始: このアクションは、セーブポイントからジョブを開始します。 ジョブを開始するには、停止したジョブを選択して開始します。
フロー テンプレートに必要なオプションを入力し、開始します。 ユーザーは、ジョブを開始するセーブポイントを選択する必要があります。 既定では、最後に成功したセーブポイントが取得されます。
プロパティの詳細:
財産 説明 既定値 必須 引数 ジョブのメインプログラムに対する引数。 すべての引数はスペースで区切る必要があります。 いいえ 最後のセーブポイント ジョブを停止する前に、最後に成功したセーブポイントの取得。 これは、セーブポイントが選択されていない場合に既定で使用されます。 編集不可 保存ポイント名 ユーザーは、ジョブに使用できるセーブポイントを一覧表示し、1 つを選択してジョブを開始できます。 いいえ ジョブが開始されると、ポータルのジョブの状態は RUNNINGになります。
更新: 更新は、更新されたジョブ コードを使用してジョブを再起動するのに役立ちます。 ユーザーは、ストレージの場所で最新のジョブ jar を更新し、ポータルからジョブを更新する必要があります。 この更新プログラムは、セーブポイントを使用してジョブを停止し、最新の jar で再度開始します。
ジョブを更新するためのテンプレート。
ジョブが更新されると、ポータルのジョブの状態は "RUNNING" になります。
ステートレス更新: このジョブは更新に似ていますが、最新のコードでジョブを新たに再起動する必要があります。
ジョブを更新するためのテンプレート。
プロパティの詳細:
財産 説明 既定値 必須 引数 ジョブのメインプログラムに対する引数。 すべての引数をスペースで区切ります。 いいえ ジョブが更新されると、ポータルのジョブの状態は RUNNING になります。
セーブポイント: Flink ジョブのセーブポイントを取得します。
セーブポイントは時間のかかるプロセスであり、時間がかかります。 ジョブ アクションの状態を進行中として確認できます。
キャンセル: このジョブは、ユーザーがジョブを終了するのに役立ちます。
削除: ポータルから仕事データを削除します。
ジョブの詳細の表示: ジョブの詳細を表示するには、ユーザーがジョブ名をクリックすると、ジョブと最後のアクションの結果に関する詳細が表示されます。
失敗したアクションの場合、このジョブ JSON は詳細な例外と失敗の理由を示します。
rest API を使用したジョブ管理の
AKS 上の HDInsight では、ジョブを送信してジョブを管理するためのわかりやすい ARM Rest API がサポートされています。 この Flink REST API を使用すると、Flink ジョブ操作を Azure Pipeline にシームレスに統合できます。 新しいジョブを起動する場合でも、実行中のジョブを更新する場合でも、さまざまなジョブ操作を実行する場合でも、この合理化されたアプローチにより、手動の手順が不要になり、Flink クラスターを効率的に管理できるようになります。
Rest API のベース URL 形式
REST API の次の URL を参照してください。この API を使用する前に、ユーザーは AKS API バージョンのサブスクリプション、リソース グループ、クラスター プール、クラスター名、HDInsight を置き換える必要があります。
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
この REST API を使用すると、ユーザーは新しいジョブの開始、ジョブの停止、ジョブの開始、セーブポイントの作成、ジョブの取り消し、ジョブの削除を行うことができます。 現在のAPI_VERSIONは 2023-06-01-preview です。
REST API 認証
Flink ARM Rest API ユーザーを認証するには、ARM リソースのベアラー トークンまたはアクセス トークンを取得する必要があります。 サービス プリンシパルを使用して Azure ARM (Azure Resource Manager) REST API を認証するには、次の一般的な手順に従います。
サービス プリンシパルを作成します。
az ad sp create-for-rbac --name <your-SP-name>
flink
クラスターの SP に所有者アクセス許可を付与します。サービス プリンシパルを使用してログインします。
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
アクセス トークンを取得します。
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
ユーザーは、表示された URL でトークンを使用できます。
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
マネージド ID を使用した認証の : ユーザーは、マネージド ID をサポートするリソースを利用して、ジョブ REST API を呼び出すことができます。 詳細については、マネージド ID ドキュメントを参照してください。
API とパラメーターの一覧
新しいジョブ: Flink に新しいジョブを送信するための REST API
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" 要求本文:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
JSON 本文のプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 "FlinkJob" と記載されるべきです はい ジョブ名 ジョブの一意の名称。 これはポータルに表示されます。 ジョブ名は小文字にする必要があります。 はい アクション 仕事の操作種類を示します。 新しいジョブを起動する場合、常に"NEW"を使用する必要があります。 はい ジョブジャーディレクトリ ジョブJARディレクトリの保存パス。 ユーザーはクラスター ストレージにディレクトリを作成し、ジョブ jar をアップロードする必要があります。 はい jarName ジョブ jar の名前。 はい entryClass ジョブの実行を開始するジョブのエントリ クラス。 はい args ジョブのメインプログラムに対する引数。 引数をスペースで区切ります。 いいえ 並列性 ジョブ Flinkの並列処理。 2 はい savepoint.directory ジョブ用のセーブポイントディレクトリ。 ユーザーは、ストレージ アカウントにジョブ セーブポイント用の新しいディレクトリを作成することをお勧めします。 abfs://<container>@<account>/<deployment-ID>/savepoints
いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの停止: 現在実行中のジョブを停止するための REST APIを提供します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" リクエストボディ
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
JSON 本文のプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 『FlinkJob』とすべきです はい ジョブ名 ジョブ名。ジョブの起動に使用されます。 はい アクション "STOP" にする必要があります はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの開始: REST API を使用して、停止中のジョブを開始します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" 要求本文の
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
JSON ボディのプロパティ詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 "FlinkJob" という名前にする必要があります はい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション 「START」にするべきです。 はい セーブポイント名 ポイント名を保存してジョブを開始します。 これは省略可能なプロパティです。既定では、開始操作は最後に成功したセーブポイントを受け取ります。 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
更新ジョブ: 現在実行中のジョブを更新するための rest API を します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" リクエストボディ
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
JSON 本文のプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 "FlinkJob" にする必要があります はい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション 新しいジョブを起動する場合は常に "UPDATE" にする必要があります。 はい args ジョブ JVM 引数 いいえ セーブポイント名 ポイント名を保存してジョブを開始します。 これは省略可能なプロパティです。既定では、開始操作は最後に成功したセーブポイントを受け取ります。 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ステートレス更新ジョブ: ステートレス更新用の rest API を します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ 認証 = "Bearer $token" リクエスト本文
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
JSON 本文のプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 "FlinkJob" に設定してください はい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション 新しいジョブの起動には、常に "STATELESS_UPDATE" にする必要があります。 はい 引数 (args) ジョブ JVM 引数 いいえ 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
セーブポイント: ジョブのセーブポイントをトリガーするための REST API。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" 要求本文の
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
JSON 本文のプロパティ詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 「FlinkJob」にするべきです。 はい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション 新しいジョブを起動する場合は、必ず "SAVEPOINT" にしてください。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
保存ポイントの一覧表示: Rest API を使用して、セーブポイント ディレクトリからすべてのセーブポイントを一覧表示します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" リクエストボディ
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
JSON 本文のプロパティ詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 「FlinkJob」と設定する必要があります はい jobName ジョブの起動に使用されるジョブ名 はい アクション "LIST_SAVEPOINT" にする必要があります はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
キャンセル: REST API を使用してジョブを取り消します。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" リクエストボディ
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
JSONプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 FlinkJob
であるべきですはい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション キャンセルにする必要があります。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
削除:ジョブを削除するための REST API。
オプション 価値 方式 投稿 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
ヘッダ Authorization = "Bearer $token" リクエストボディ
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
JSON 本文のプロパティの詳細:
財産 説明 既定値 必須 職種 ジョブの種類。 "FlinkJob" にする必要があります はい ジョブ名 ジョブの起動に使用されるジョブ名。 はい アクション DELETE にする必要があります。 はい 例:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
ジョブの一覧表示: REST API を使用して、現在のアクションのすべてのジョブとその状態を一覧表示します。
オプション 価値 方式 取得 URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
ヘッダ Authorization = "ベアラー $token" 出力:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
手記
アクションが進行中の場合、actionResult は値 'IN_PROGRESS' を持つアクションを示します。正常に完了すると、"SUCCESS" と表示され、失敗した場合は 'FAILED' になります。
参考
- Apache Flink ジョブのスケジューリング
- Apache、Apache Flink、Flink、関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の 商標です。