Použití rozhraní Livy API k odesílání a spouštění úloh relací
Poznámka:
Rozhraní API Livy for Fabric Datoví technici ing je ve verzi Preview.
Platí pro:✅ Datoví technici ing a Datová Věda v Microsoft Fabric
Odešlete dávkové úlohy Sparku pomocí rozhraní API Livy pro prostředky infrastruktury Datoví technici.
Požadavky
Kapacita Fabric Premium nebo zkušební verze se službou Lakehouse
Vzdálený klient, jako je Visual Studio Code s jupyter Notebooks, PySpark a knihovnou MSAL (Microsoft Authentication Library) pro Python
Pro přístup k rozhraní REST API fabric se vyžaduje token aplikace Microsoft Entra. Zaregistrujte aplikaci na platformě Microsoft Identity Platform.
Některá data ve vašem jezeře, tento příklad používá NYC Taxi & Limousine Commission green_tripdata_2022_08 parketový soubor načtený do jezera.
Rozhraní Livy API definuje jednotný koncový bod pro operace. Zástupné symboly {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} a {Fabric_LakehouseID} nahraďte příslušnými hodnotami, když budete postupovat podle příkladů v tomto článku.
Konfigurace editoru Visual Studio Code pro relaci rozhraní Livy API
Ve svém Fabric Lakehouse vyberte Nastavení Lakehouse.
Přejděte do části Koncový bod Livy.
Zkopírujte úlohu relace připojovací řetězec (první červené pole na obrázku) do kódu.
Přejděte do Centra pro správu Microsoft Entra a zkopírujte ID aplikace (klienta) i ID adresáře (tenanta) do kódu.
Vytvoření relace Sparku rozhraní Livy API
Vytvořte poznámkový blok v editoru
.ipynb
Visual Studio Code a vložte následující kód.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Spusťte buňku poznámkového bloku, měla by se v prohlížeči zobrazit automaticky otevíraná nabídka, která vám umožní zvolit identitu pro přihlášení.
Jakmile zvolíte identitu pro přihlášení, budete také požádáni o schválení oprávnění rozhraní API pro registraci aplikací Microsoft Entra.
Po dokončení ověřování zavřete okno prohlížeče.
V editoru Visual Studio Code by se měl vrátit token Microsoft Entra.
Přidejte další buňku poznámkového bloku a vložte tento kód.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Spusťte buňku poznámkového bloku. Při vytváření relace Livy by se měla zobrazit jedna čára.
Relaci Livy můžete ověřit pomocí centra monitorování [Zobrazit úlohy v centru monitorování](#View úlohy v centru monitorování).
Odeslání příkazu spark.sql pomocí relace Sparku rozhraní Api Livy
Přidejte další buňku poznámkového bloku a vložte tento kód.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Spusťte buňku poznámkového bloku. Při odeslání úlohy a vrácených výsledků by se mělo zobrazit několik přírůstkových řádků.
Odeslání druhého příkazu spark.sql pomocí relace Sparku rozhraní Livy API
Přidejte další buňku poznámkového bloku a vložte tento kód.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Spusťte buňku poznámkového bloku. Při odeslání úlohy a vrácených výsledků by se mělo zobrazit několik přírůstkových řádků.
Zavřete relaci Livy pomocí třetího příkazu.
Přidejte další buňku poznámkového bloku a vložte tento kód.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Zobrazení úloh v centru monitorování
K centru monitorování se dostanete tak, že vyberete Možnost Monitorování v navigačních odkazech na levé straně a zobrazíte různé aktivity Apache Sparku.
Když relace probíhá nebo je dokončená, můžete stav relace zobrazit tak, že přejdete na Monitorování.
Vyberte a otevřete název poslední aktivity.
V tomto případě relace rozhraní Livy API uvidíte předchozí odeslání relací, podrobnosti o spuštění, verze Sparku a konfiguraci. Všimněte si stavu zastavení v pravém horním rohu.
K rekapitulace celého procesu potřebujete vzdáleného klienta, jako je Visual Studio Code, token aplikace Microsoft Entra, adresa URL koncového bodu rozhraní Livy API, ověřování vůči vaší službě Lakehouse a nakonec rozhraní API Session Livy.