共用方式為


AKS 叢集上 HDInsight 中的 Apache Flink® 作業管理

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的資訊,請參閱 在 AKS 上的 Azure HDInsight 預覽資訊。 如有問題或功能建議,請提交請求至 AskHDInsight,並關注我們以獲取Azure HDInsight 社群的更多更新。

AKS 上的 HDInsight 提供一項功能,可透過 Azure 入口網站(使用者易記介面)和 ARM Rest API 直接管理和提交 Apache Flink® 作業。

這項功能可讓使用者有效率地控制及監視其 Apache Flink 作業,而不需要深入的叢集層級知識。

好處

  • 簡化的作業管理:在 Azure 入口網站中原生整合 Apache Flink 時,使用者不再需要大量瞭解 Flink 叢集來提交、管理和監視作業。

  • User-Friendly REST API:AKS 上的 HDInsight 提供方便使用的 ARM Rest API 來提交和管理 Flink 作業。 使用者可以使用這些 Rest API,從任何 Azure 服務提交 Flink 作業。

  • 輕鬆的工作更新和狀態管理:原生 Azure 入口網站整合提供工作更新及將其還原至最後儲存狀態(儲存點)的方便體驗。 這項功能可確保整個作業生命週期的持續性和數據完整性。

  • 使用 Azure 管線將 Flink 作業自動化:在 AKS 上使用 HDInsight,Flink 使用者即可存取方便使用的 ARM Rest API,您可以順暢地將 Flink 作業作業整合到 Azure Pipeline 中。 無論您是啟動新作業、更新執行中作業或執行各種作業作業,此簡化的方法都不需要手動步驟。 它可讓您有效率地管理 Flink 叢集。

先決條件

在從入口網站或 REST API 提交和管理作業之前,有一些先決條件。

  • 在叢集的主要儲存帳戶中建立目錄,上傳作業 jar。

  • 如果使用者想要取得保存點,請在儲存帳戶中建立一個作業保存點的目錄。

    螢幕快照顯示目錄結構。

主要功能和作業

  • 新作業提交:使用者可以輕鬆地提交新的 Flink,無需進行複雜的設定或使用外部工具。

  • 停止並啟動具有儲存點的工作:使用者可以正常停止和啟動其 Flink 工作,從先前的狀態 (Savepoint)。 儲存點可確保保留作業進度,以便順暢地繼續作業。

  • 作業更新:在更新儲存帳戶上的 JAR 檔案後,使用者可以更新執行中的作業。 此更新會自動取得儲存點,並使用新的 jar 啟動作業。

  • 無狀態更新:透過無狀態更新簡化工作從頭開始的重啟。 這項功能可讓使用者使用更新的工作 Jar 包啟動乾淨的重啟。

  • Savepoint 管理:在任何指定時間,用戶可以為其執行中的作業建立儲存點。 您可以列出這些儲存點,並在需要時從特定檢查點重新啟動作業。

  • 取消:這會永久取消作業。

  • 刪除:刪除作業記錄。

在 AKS 上管理 HDInsight 中作業的選項

AKS 上的 HDInsight 提供管理 Flink 作業的方式。

從 Azure 入口網站 作業管理

若要從入口網站執行 Flink 作業,請移至:

入口網站 --> AKS 叢集集區上的 HDInsight --> Flink 叢集 --> 設定 --> Flink 作業

螢幕快照顯示如何執行「flink」作業。

  • 新作業: 若要提交新作業,請將作業 jar 上傳至記憶體帳戶並建立儲存點目錄。 使用必要的組態完成範本,然後提交作業。

    截圖顯示如何建立新工作。

    屬性詳細數據:

    財產 描述 預設值 強制的
    作業名稱 作業的唯一名稱。 這會顯示在入口網站上。 作業名稱應為小寫字母。 是的
    Jar 路徑 作業 jar 的儲存路徑。 用戶應該在叢集記憶體中建立目錄,並上傳作業 jar。 是的
    進入類別 作業執行的入口類別。 是的
    參數 工作主要程式的參數。 使用空格分隔所有自變數。
    排比 作業 Flink 並行處理。 2 是的
    savepoint.directory 作業的 Savepoint 目錄。 建議使用者為記憶體帳戶中的工作儲存點建立新的目錄。 abfs://<container>@<account>/<deployment-ID>/savepoints

    啟動作業之後,入口網站上的工作狀態會顯示為 RUNNING

  • 停止: 停止作業不需要任何參數,使用者可以選取動作來停止作業。

    螢幕快照顯示使用者如何停止作業。

    一旦停止作業,入口網站上的工作狀態會 STOPPED

  • 開始: 此動作會從儲存點啟動作業。 若要啟動作業,請選取已停止的工作並加以啟動。

    螢幕快照顯示用戶啟動作業的方式。

    請填寫流程範本所需的選項並開始。 用戶必須選取使用者想要啟動作業的儲存點。 預設情況下,它會採用最後一個成功的儲存點。

    螢幕快照顯示填滿流程範本的方式。

    物業詳細資料

    財產 描述 預設值 強制的
    參數 工作主程式的參數。 所有參數都應該以空格分隔。
    上次儲存點 停止作業前的最後一次成功儲存點。 如果未選取儲存點,則會預設使用此設定。 不可編輯
    儲存點名稱 用戶可以列出作業可用的儲存點,然後選取一個來啟動作業。

    作業啟動後,平台上的工作狀態會 執行中

  • 更新: Update 可協助使用更新的作業程式代碼重新啟動作業。 用戶必須在儲存位置更新最新的工作 jar 檔案,並從入口網站更新工作。 此更新會使用保存點停止作業,並以最新的 jar 檔案重新啟動。

    螢幕快照顯示如何使用更新的作業程序代碼重新啟動作業。

    更新作業的範本。

    螢幕快照顯示更新作業的範本。

    更新作業之後,入口網站上的工作狀態為「執行中」。

  • 無狀態更新: 此作業就像是更新,但它牽涉到使用最新程序代碼重新重新啟動作業。

    螢幕快照顯示作業以最新的代碼重新開始。

    更新作業的範本。

    螢幕截圖顯示用於更新無狀態作業的範本。

    物業詳細資料

    財產 描述 預設值 強制的
    參數 工作主要程式的參數。 使用空格分隔所有自變數。

    更新作業之後,入口網站上的工作狀態為 [正在執行]。

  • 儲存點: 取得 Flink 作業的儲存點。

    螢幕快照顯示 Flink 作業的儲存點。

    儲存點是耗時的程式,需要一些時間。 您可以看到作業動作狀態為進行中。

    螢幕快照顯示作業動作狀態。

  • 取消: 此作業可協助使用者終止作業。

    螢幕快照顯示使用者如何終止作業。

  • 刪除: 從網站刪除工作資料。

    螢幕快照顯示使用者如何從入口網站刪除作業數據。

  • 檢視作業詳細數據: 若要檢視作業詳細數據,使用者可以按兩下作業名稱,它會提供作業的詳細數據和最後一個動作結果。

    螢幕快照顯示如何檢視作業詳細數據。

    對於任何失敗的動作,此作業 json 會提供詳細的例外狀況和失敗原因。

工作管理使用 REST API

AKS 上的 HDInsight 支援使用者友好的 ARM REST API 來提交和管理作業。 使用此 Flink REST API,您可以順暢地將 Flink 作業作業整合到 Azure Pipeline 中。 無論您是啟動新的作業、更新執行中的作業或執行各種作業作業,此簡化的方法都不需要手動步驟,並讓您有效率地管理 Flink 叢集。

Rest API 的基底 URL 格式

請參閱下列的 REST API URL,用戶必須先更換訂用帳戶、資源群組、叢集集區、叢集名稱,以及 HDInsight on AKS 的 API 版本,再使用它。 https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}

使用此 REST API,使用者可以起始新的作業、停止作業、啟動作業、建立儲存點、取消作業和刪除作業。 目前的API_VERSION是 2023-06-01-preview。

Rest API 驗證

若要驗證 Flink ARM Rest API 使用者,必須取得 ARM 資源的持有人令牌或存取令牌。 若要使用服務主體來驗證 Azure ARM (Azure Resource Manager) REST API,您可以遵循下列一般步驟:

  • 建立服務主體。

    az ad sp create-for-rbac --name <your-SP-name>

  • 授權SP對flink叢集擁有者許可權。

  • 使用服務主體登入。

    az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>

  • 取得存取令牌。

    $token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json

    $tok = $token.accesstoken

    用戶可以在顯示的 URL 中使用令牌。

    $data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }

使用受控識別進行驗證: 使用者可以利用支援受控識別的資源來呼叫作業 REST API。 如需詳細資訊,請參閱 受控識別 檔。

API 和參數清單

  • 新作業: Rest API,將新作業提交至 Flink。

    選擇 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求主體:

    { 
        "properties": { 
            "jobType": "FlinkJob", 
            "jobName": "<JOB_NAME>", 
            "action": "NEW", 
            "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
            "jarName": "<JOB_JAR_NAME>", 
            "entryClass": "<JOB_ENTRY_CLASS>", 
            “args”: ”<JOB_JVM_ARGUMENT>”
            "flinkConfiguration": { 
                "parallelism": "<JOB_PARALLELISM>", 
                "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" 
            } 
         } 
    }
    

    JSON 本文的 屬性詳細數據:

    財產 描述 預設值 強制的
    職位類型 作業的類型。 應該是 “FlinkJob” 是的
    工作名稱 作業的唯一名稱。 這會顯示在入口網站上。 作業名稱應為小寫字母。 是的
    行動 它指出工作中的操作類型。 它應該是「新增」,一律用於新的作業啟動。 是的
    jobJarDirectory 作業 Jar 目錄的儲存路徑。 用戶應該在叢集記憶體中建立目錄,並上傳作業 jar。 是的
    jarName 作業檔案的名稱。 是的
    入口類別 從中開始執行作業之作業的項目類別。 是的
    args 作業主要程式的參數。 使用空格分隔自變數。
    排比 作業 Flink 並行性。 2 是的
    savepoint.directory 作業的 Savepoint 目錄。 建議使用者在儲存帳戶中為作業保存點建立新的目錄。 abfs://<container>@<account>/<deployment-ID>/savepoints

    例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 停止作業: Rest API 來停止目前執行中的作業。

    選項 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求正文

       {
          "properties": {
            "jobType": "FlinkJob",
            "jobName": "<JOB_NAME>",
            "action": "STOP"
          }
        }
    

    JSON 主體的屬性詳細資料:

    財產 描述 預設值 強制性的
    職位類型 作業的類型。 應該是 “FlinkJob” 是的
    jobName 作業名稱,用於啟動作業 是的
    行動 它應該是 “STOP” 是的

    例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 啟動作業: 使用 REST API 启动已停止的作業。

    選擇 價值
    方法 貼文
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求正文

       {
          "properties": {
             "jobType": "FlinkJob",
             "jobName": "<JOB_NAME>",
             "action": "START",
             "savePointName": "<SAVEPOINT_NAME>"
          }
        }
    

    JSON 正文的 屬性詳細數據:

    財產 描述 預設值 強制的
    職業類型 作業的類型。 應該是 “FlinkJob” 是的
    jobName 用於啟動作業的作業名稱。 是的
    行動 應該為 “START” 是的
    儲存點名稱 儲存點名稱以啟動作業。 這是可選屬性,預設情況下,啟動作業會使用最後一個成功的儲存點。

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 更新作業:用於更新目前執行中作業的 Rest API。

    選項 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求內容

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”,
              "savePointName": "<SAVEPOINT_NAME>"
          }
      }
    
    

    JSON 本文的 屬性詳細資訊:

    財產 描述 預設值 強制性
    工作類型 作業的類型。 應該是 “FlinkJob” 是的
    職位名稱 用於啟動作業的作業名稱。 是的
    行動 它應該一律為「更新」,以供新作業啟動使用。 是的
    args 作業 JVM 自變數
    savePointName 儲存點名稱以啟動作業。 這是選擇性屬性,根據預設,啟動作業會採用最後一個成功的儲存點。

    例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 無狀態更新作業: REST API 用於無狀態更新。

    選擇 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求內文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "STATELESS_UPDATE",
              “args” : “<JOB_JVM_ARGUMENT>”
          }
      }
    

    JSON 的屬性詳細數據:

    財產 描述 預設值 強制的
    職位類型 作業的類型。 應該是 “FlinkJob” 是的
    工作名稱 用於啟動作業的作業名稱。 是的
    行動 它應該是「STATELESS_UPDATE」,一律用於新的作業啟動。 是的
    args 作業 JVM 自變數

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • Savepoint: Rest API 來觸發作業的儲存點。

    選擇 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求內文

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "SAVEPOINT"
          }
      }
    

    JSON 本文的屬性詳細資訊:

    財產 描述 預設值 強制性的
    職位類型 作業的類型。 應該是 “FlinkJob” 是的
    工作名稱 用於啟動作業的作業名稱。 是的
    行動 新的作業啟動應該始終使用「SAVEPOINT」。 是的

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 列出儲存點: Rest API,從 savepoint 目錄列出所有儲存點。

    選項 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求體

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "LIST_SAVEPOINT"
          }
      }
    

    JSON 主體中 屬性詳細數據:

    財產 描述 預設值 強制的
    職務類型 作業的類型。 應該是 “FlinkJob” 是的
    作業名稱 用於啟動作業的作業名稱 是的
    行動 它應該是「LIST_SAVEPOINT」 是的

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 取消: Rest API 取消作業。

    選擇 價值
    方法 貼文
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求主體

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "CANCEL"
          }
      }
    

    JSON 主體的 屬性詳細數據:

    財產 描述 預設值 強制的
    工作類型 作業的類型。 它應該是 FlinkJob 是的
    工作名稱 用於啟動作業的作業名稱。 是的
    行動 它應該是取消。 是的

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 刪除: Rest API 刪除作業。

    選擇 價值
    方法 發佈
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    請求主體

      {
          "properties": {
              "jobType": "FlinkJob",
              "jobName": "<JOB_NAME>",
              "action": "DELETE"
          }
      }
    

    JSON 本文的 屬性詳細數據:

    財產 描述 預設值 強制的
    工作類型 作業的類型。 應該是 “FlinkJob” 是的
    工作名稱 用於啟動作業的作業名稱。 是的
    行動 它應該是刪除。 是的

    範例:

    Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"

  • 列出作業: 使用 Rest API 列出所有作業及當前動作的狀態。

    選擇 價值
    方法 獲取
    URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
    頁眉 Authorization = “Bearer $token”

    輸出:

    { 
      "value": [ 
          { 
              "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", 
              "properties": { 
                  "jobType": "FlinkJob", 
                  "jobName": "job1", 
                  "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", 
                  "jarName": "<JOB_JAR_NAME>", 
                  "action": "STOP", 
                  "entryClass": "<JOB_ENTRY_CLASS>", 
                  "flinkConfiguration": { 
                      "parallelism": "2", 
                      "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" 
                  }, 
                  "jobId": "20e9e907eb360b1c69510507f88cdb7b", 
                  "status": "STOPPED", 
                  "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", 
                  "actionResult": "SUCCESS", 
                  "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" 
          } 
       }
      ]
    }
    

注意

當任何動作正在進行中時,actionResult 會以 'IN_PROGRESS' 值表示它成功完成時,它會顯示 'SUCCESS',如果失敗,則會是 'FAILED'。

參考