Dela via


Använda Livy-API:et för att skicka och köra sessionsjobb

Kommentar

Livy API för Fabric Dataingenjör ing finns i förhandsversion.

Gäller för:✅ Dataingenjör ing och Datavetenskap i Microsoft Fabric

Skicka Spark-batchjobb med livy-API:et för Fabric-Dataingenjör ing.

Förutsättningar

Livy-API:et definierar en enhetlig slutpunkt för åtgärder. Ersätt platshållarna {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} och {Fabric_LakehouseID} med lämpliga värden när du följer exemplen i den här artikeln.

Konfigurera Visual Studio Code för din Livy API-session

  1. Välj Lakehouse-inställningar i din Fabric Lakehouse.

    Skärmbild som visar Lakehouse-inställningar.

  2. Gå till avsnittet Livy-slutpunkt .

    skärmbild som visar Lakehouse Livy-slutpunkten och sessionsjobbet anslutningssträng.

  3. Kopiera sessionsjobbet anslutningssträng (den första röda rutan i bilden) till koden.

  4. Gå till administrationscentret för Microsoft Entra och kopiera både program-ID:t (klient-) och katalog-ID:t (klientorganisation) till din kod.

    Skärmbild som visar Översikt över Livy API-appen i administrationscentret för Microsoft Entra.

Skapa en Livy API Spark-session

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Kör notebook-cellen. Ett popup-fönster bör visas i webbläsaren så att du kan välja den identitet som du vill logga in med.

    Skärmbild som visar inloggningsskärmen i Microsoft Entra-appen.

  3. När du har valt den identitet som du vill logga in med uppmanas du också att godkänna API-behörigheterna för Microsoft Entra-appregistrering.

    Skärmbild som visar api-behörigheter för Microsoft Entra-appar.

  4. Stäng webbläsarfönstret när autentiseringen har slutförts.

    Skärmbild som visar att autentiseringen är klar.

  5. I Visual Studio Code bör du se att Microsoft Entra-token returneras.

    Skärmbild som visar den Microsoft Entra-token som returnerades efter att cellen har körts och loggat in.

  6. Lägg till ytterligare en notebook-cell och infoga den här koden.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Kör notebook-cellen. Du bör se en rad som skrivs ut när Livy-sessionen skapas.

    Skärmbild som visar resultatet av den första notebook-cellkörningen.

  8. Du kan kontrollera att Livy-sessionen har skapats med hjälp av [Visa dina jobb i övervakningshubben](#View dina jobb i övervakningshubben).

Skicka en spark.sql-instruktion med Livy API Spark-sessionen

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Kör notebook-cellen. Du bör se flera inkrementella rader som skrivs ut när jobbet skickas och resultatet returneras.

    Skärmbild som visar resultatet av den första notebook-cellen med Spark.sql körning.

Skicka en andra spark.sql-instruktion med Livy API Spark-sessionen

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Kör notebook-cellen. Du bör se flera inkrementella rader som skrivs ut när jobbet skickas och resultatet returneras.

    Skärmbild som visar resultatet av den andra notebook-cellkörningen.

Stäng Livy-sessionen med en tredje instruktion

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Visa dina jobb i övervakningshubben

Du kan komma åt övervakningshubben för att visa olika Apache Spark-aktiviteter genom att välja Övervaka i navigeringslänkarna till vänster.

  1. När sessionen pågår eller är i slutfört tillstånd kan du visa sessionsstatusen genom att gå till Övervaka.

    Skärmbild som visar tidigare Livy API-inlämningar i övervakningshubben.

  2. Välj och öppna det senaste aktivitetsnamnet.

    Skärmbild som visar den senaste Livy API-aktiviteten i övervakningshubben.

  3. I det här Livy API-sessionsfallet kan du se dina tidigare sessioner, körningsinformation, Spark-versioner och konfiguration. Observera den stoppade statusen längst upp till höger.

    Skärmbild som visar den senaste livy-API-aktivitetsinformationen i övervakningshubben.

För att sammanfatta hela processen behöver du en fjärrklient som Visual Studio Code, en Microsoft Entra-apptoken, Livy API-slutpunkts-URL, autentisering mot Lakehouse och slutligen ett Livy-API för session.