共用方式為


使用 Livy API 來提交和執行工作作業

注意

適用於網狀架構 資料工程師 的 Livy API 處於預覽狀態。

適用於:✅Microsoft Fabric 中的 資料工程師 和 資料科學

使用適用於網狀架構的 Livy API 資料工程師 提交 Spark 批次作業。

必要條件

Livy API 會定義作業的統一端點。 當您遵循本文中的範例時,請將佔位元 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}和 {Fabric_LakehouseID} 取代為您適當的值。

設定 Livy API 會話的 Visual Studio Code

  1. 在 Fabric Lakehouse 中選取 [Lakehouse 設定 ]。

    顯示 Lakehouse 設定的螢幕快照。

  2. 流覽至 [Livy 端點 ] 區段。

    顯示 Lakehouse Livy 端點和工作階段作業 連接字串 的螢幕快照。

  3. 將會話作業 連接字串 (映射中的第一個紅色方塊) 複製到您的程式代碼。

  4. 流覽至 Microsoft Entra 系統管理中心 ,並將應用程式 (用戶端) 識別碼和目錄 (租使用者) 識別碼複製到您的程式碼。

    顯示 Microsoft Entra 系統管理中心中 Livy API 應用程式概觀的螢幕快照。

建立 Livy API Spark 工作階段

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼。

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. 執行筆記本數據格,瀏覽器中應該會出現彈出視窗,讓您選擇要用來登入的身分識別。

    顯示登入畫面至 Microsoft Entra 應用程式的螢幕快照。

  3. 選擇要登入的身分識別之後,系統也會要求您核准 Microsoft Entra 應用程式註冊 API 許可權。

    顯示Microsoft Entra 應用程式 API 許可權的螢幕快照。

  4. 完成驗證之後關閉瀏覽器視窗。

    顯示驗證完成的螢幕快照。

  5. 在 Visual Studio Code 中,您應該會看到傳回Microsoft Entra 令牌。

    此螢幕快照顯示執行儲存格和登入之後傳回Microsoft Entra 令牌。

  6. 新增另一個筆記本數據格,並插入此程序代碼。

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. 執行筆記本數據格,您應該會在建立 Livy 工作階段時看到一行列印。

    顯示第一個筆記本數據格執行結果的螢幕快照。

  8. 您可以使用 [在監視中樞檢視您的作業](#View 您在監視中樞的工作),來確認 Livy 會話是否已建立。

使用 Livy API Spark 工作階段提交spark.sql語句

  1. 新增另一個筆記本數據格,並插入此程序代碼。

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. 執行筆記本數據格,您應該會看到在提交作業並傳回結果時列印數條累加行。

    顯示執行Spark.sql第一個筆記本數據格結果的螢幕快照。

使用 Livy API Spark 工作階段提交第二個spark.sql語句

  1. 新增另一個筆記本數據格,並插入此程序代碼。

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. 執行筆記本數據格,您應該會看到在提交作業並傳回結果時列印數條累加行。

    顯示第二個筆記本數據格執行結果的螢幕快照。

使用第三個語句關閉 Livy 工作階段

  1. 新增另一個筆記本數據格,並插入此程序代碼。

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

在監視中樞檢視您的作業

您可以選取左側導覽連結中的 [監視],來存取監視中樞以檢視各種 Apache Spark 活動。

  1. 當工作階段進行中或處於已完成狀態時,您可以流覽至 [監視] 來檢視工作階段狀態。

    顯示監視中樞中先前 Livy API 提交的螢幕快照。

  2. 選取並開啟最新的活動名稱。

    顯示監視中樞內最新 Livy API 活動的螢幕快照。

  3. 在此 Livy API 工作階段案例中,您可以看到先前的工作階段提交、執行詳細數據、Spark 版本和設定。 請注意右上方的已停止狀態。

    顯示監視中樞內最新 Livy API 活動詳細數據的螢幕快照。

若要回顧整個程式,您需要遠端用戶端,例如 Visual StudioCode、Microsoft Entra 應用程式令牌、Livy API端點 URL、對 Lakehouse 的驗證,最後是會話 Livy API。