次の方法で共有


Livy API を使用してセッション ジョブを送信して実行する

Note

Livy API for Fabric Data Engineering はプレビュー段階です。

適用対象:✅ Microsoft Fabric でのデータ エンジニアリングとデータ サイエンス

Livy API for Fabric Data Engineering を使用して Spark バッチ ジョブを送信します。

前提条件

Livy API で、操作用に統合エンドポイントを定義します。 この記事の例に沿って進める際は、プレースホルダー {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}、{Fabric_LakehouseID} を適切な値に置き換えてください。

Livy API セッションの Visual Studio Code を構成する

  1. Fabric Lakehouse で [Lakehouse の設定] を選択します。

    Lakehouse の設定を示すスクリーンショット。

  2. [Livy エンドポイント] セクションに移動します。

    Lakehouse Livy エンドポイントとセッション ジョブの接続文字列を示すスクリーンショット。

  3. セッション ジョブの接続文字列 (画像内の 1 つ目の赤い枠) をコードにコピーします。

  4. Microsoft Entra 管理センターに移動し、アプリケーション (クライアント) ID とディレクトリ (テナント) ID の両方をコードにコピーします。

    Microsoft Entra 管理センターの Livy API アプリの概要を示すスクリーンショット。

Livy API Spark セッションを作成する

  1. Visual Studio Code で .ipynb ノートブックを作成し、次のコードを挿入します。

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. ノートブック セルを実行すると、ブラウザーにポップアップが表示され、サインインに使用する ID を選択できます。

    Microsoft Entra アプリへのログオン画面を示すスクリーンショット。

  3. サインインに使用する ID を選択すると、Microsoft Entra アプリ登録 API へのアクセス許可の承認も求められます。

    Microsoft Entra アプリ API に対するアクセス許可を示すスクリーンショット。

  4. 認証が完了したら、ブラウザー ウィンドウを閉じます。

    認証の完了を示すスクリーンショット。

  5. Visual Studio Code で、Microsoft Entra トークンが返されていることがわかります。

    セルの実行とログイン後に返された Microsoft Entra トークンを示すスクリーンショット。

  6. 別のノートブック セルを追加し、このコードを挿入します。

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. ノートブック セルを実行すると、Livy セッションの作成時に出力された 1 行が表示されます。

    最初のノートブック セルの実行結果を示すスクリーンショット。

  8. [監視ハブでジョブを表示する](#監視ハブでジョブを表示する) を使用して、Livy セッションが作成されたことを確認できます。

Livy API Spark セッションを使用して spark.sql ステートメントを送信する

  1. 別のノートブック セルを追加し、このコードを挿入します。

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. ノートブック セルを実行して、ジョブが送信され、結果が返されると、複数の増分行が出力されます。

    Spark.sql の実行による最初のノートブック セルの結果を示すスクリーンショット。

Livy API Spark セッションを使用して 2 つ目の spark.sql ステートメントを送信する

  1. 別のノートブック セルを追加し、このコードを挿入します。

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. ノートブック セルを実行して、ジョブが送信され、結果が返されると、複数の増分行が出力されます。

    2 つ目のノートブック セルの実行結果を示すスクリーンショット。

3 つ目のステートメントで Livy セッションを終了する

  1. 別のノートブック セルを追加し、このコードを挿入します。

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

監視ハブでジョブを表示する

左側のナビゲーション リンクの [監視] を選択して監視ハブにアクセスすると、さまざまな Apache Spark アクティビティを表示できます。

  1. セッションが進行中または完了状態の場合、[モニター] に移動してセッションの状態を表示できます。

    監視ハブでの以前の Livy API の送信を示すスクリーンショット。

  2. 最新のアクティビティ名を選択して開きます。

    監視ハブの最新の Livy API アクティビティを示すスクリーンショット。

  3. この Livy API セッションのケースでは、以前のセッション送信、実行の詳細、Spark のバージョン、構成を確認できます。 右上の停止状態に注目してください。

    監視ハブの最新の Livy API アクティビティの詳細を示すスクリーンショット。

プロセス全体をまとめると、Visual Studio Code などのリモート クライアント、Microsoft Entra アプリ トークン、Livy API エンドポイント URL、レイクハウスに対する認証、そして最後に Session Livy API が必要です。