Sdílet prostřednictvím


Použití rozhraní Livy API k odesílání a spouštění úloh relací

Poznámka:

Rozhraní API Livy for Fabric Datoví technici ing je ve verzi Preview.

Platí pro:✅ Datoví technici ing a Datová Věda v Microsoft Fabric

Odešlete dávkové úlohy Sparku pomocí rozhraní API Livy pro prostředky infrastruktury Datoví technici.

Požadavky

Rozhraní Livy API definuje jednotný koncový bod pro operace. Zástupné symboly {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} a {Fabric_LakehouseID} nahraďte příslušnými hodnotami, když budete postupovat podle příkladů v tomto článku.

Konfigurace editoru Visual Studio Code pro relaci rozhraní Livy API

  1. Ve svém Fabric Lakehouse vyberte Nastavení Lakehouse.

    Snímek obrazovky s nastavením Lakehouse

  2. Přejděte do části Koncový bod Livy.

    Snímek obrazovky znázorňující koncový bod Lakehouse Livy a připojovací řetězec úlohy relace

  3. Zkopírujte úlohu relace připojovací řetězec (první červené pole na obrázku) do kódu.

  4. Přejděte do Centra pro správu Microsoft Entra a zkopírujte ID aplikace (klienta) i ID adresáře (tenanta) do kódu.

    Snímek obrazovky s přehledem aplikace Rozhraní API Livy v Centru pro správu Microsoft Entra

Vytvoření relace Sparku rozhraní Livy API

  1. Vytvořte poznámkový blok v editoru .ipynb Visual Studio Code a vložte následující kód.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Spusťte buňku poznámkového bloku, měla by se v prohlížeči zobrazit automaticky otevíraná nabídka, která vám umožní zvolit identitu pro přihlášení.

    Snímek obrazovky s přihlašovací obrazovkou k aplikaci Microsoft Entra

  3. Jakmile zvolíte identitu pro přihlášení, budete také požádáni o schválení oprávnění rozhraní API pro registraci aplikací Microsoft Entra.

    Snímek obrazovky s oprávněními rozhraní API aplikace Microsoft Entra

  4. Po dokončení ověřování zavřete okno prohlížeče.

    Snímek obrazovky znázorňující dokončení ověřování

  5. V editoru Visual Studio Code by se měl vrátit token Microsoft Entra.

    Snímek obrazovky znázorňující token Microsoft Entra vrácený po spuštění buňky a přihlášení

  6. Přidejte další buňku poznámkového bloku a vložte tento kód.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Spusťte buňku poznámkového bloku. Při vytváření relace Livy by se měla zobrazit jedna čára.

    Snímek obrazovky znázorňující výsledky prvního spuštění buňky poznámkového bloku

  8. Relaci Livy můžete ověřit pomocí centra monitorování [Zobrazit úlohy v centru monitorování](#View úlohy v centru monitorování).

Odeslání příkazu spark.sql pomocí relace Sparku rozhraní Api Livy

  1. Přidejte další buňku poznámkového bloku a vložte tento kód.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Spusťte buňku poznámkového bloku. Při odeslání úlohy a vrácených výsledků by se mělo zobrazit několik přírůstkových řádků.

    Snímek obrazovky zobrazující výsledky první buňky poznámkového bloku s Spark.sql spuštěním

Odeslání druhého příkazu spark.sql pomocí relace Sparku rozhraní Livy API

  1. Přidejte další buňku poznámkového bloku a vložte tento kód.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Spusťte buňku poznámkového bloku. Při odeslání úlohy a vrácených výsledků by se mělo zobrazit několik přírůstkových řádků.

    Snímek obrazovky znázorňující výsledky spuštění druhé buňky poznámkového bloku

Zavřete relaci Livy pomocí třetího příkazu.

  1. Přidejte další buňku poznámkového bloku a vložte tento kód.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Zobrazení úloh v centru monitorování

K centru monitorování se dostanete tak, že vyberete Možnost Monitorování v navigačních odkazech na levé straně a zobrazíte různé aktivity Apache Sparku.

  1. Když relace probíhá nebo je dokončená, můžete stav relace zobrazit tak, že přejdete na Monitorování.

    Snímek obrazovky znázorňující předchozí odeslání rozhraní Api Livy v centru monitorování

  2. Vyberte a otevřete název poslední aktivity.

    Snímek obrazovky zobrazující nejnovější aktivitu rozhraní Livy API v centru monitorování

  3. V tomto případě relace rozhraní Livy API uvidíte předchozí odeslání relací, podrobnosti o spuštění, verze Sparku a konfiguraci. Všimněte si stavu zastavení v pravém horním rohu.

    snímek obrazovky s nejnovějšími podrobnostmi o aktivitě rozhraní Livy API v centru monitorování

K rekapitulace celého procesu potřebujete vzdáleného klienta, jako je Visual Studio Code, token aplikace Microsoft Entra, adresa URL koncového bodu rozhraní Livy API, ověřování vůči vaší službě Lakehouse a nakonec rozhraní API Session Livy.