Sdílet prostřednictvím


Použití rozhraní Livy API k odesílání a spouštění dávkových úloh Livy

Poznámka:

Rozhraní API Livy for Fabric Datoví technici ing je ve verzi Preview.

Platí pro:✅ Datoví technici ing a Datová Věda v Microsoft Fabric

Odešlete dávkové úlohy Sparku pomocí rozhraní API Livy pro prostředky infrastruktury Datoví technici.

Požadavky

Rozhraní Livy API definuje jednotný koncový bod pro operace. Zástupné symboly {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} a {Fabric_LakehouseID} nahraďte příslušnými hodnotami, když budete postupovat podle příkladů v tomto článku.

Konfigurace editoru Visual Studio Code pro batch rozhraní Livy API

  1. Ve svém Fabric Lakehouse vyberte Nastavení Lakehouse.

    Snímek obrazovky s nastavením Lakehouse

  2. Přejděte do části Koncový bod Livy.

    Snímek obrazovky znázorňující koncový bod Lakehouse Livy a připojovací řetězec úlohy relace

  3. Zkopírujte úlohu Batch připojovací řetězec (druhé červené pole na obrázku) do kódu.

  4. Přejděte do Centra pro správu Microsoft Entra a zkopírujte ID aplikace (klienta) i ID adresáře (tenanta) do kódu.

    Snímek obrazovky s přehledem aplikace Rozhraní API Livy v Centru pro správu Microsoft Entra

Vytvoření datové části Sparku a nahrání do lakehouse

  1. Vytvoření poznámkového bloku v editoru .ipynb Visual Studio Code a vložení následujícího kódu

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Uložte soubor Pythonu místně. Tato datová část kódu Pythonu obsahuje dva příkazy Sparku, které pracují s daty v Lakehouse a je potřeba je nahrát do vašeho Lakehouse. Budete potřebovat cestu ABFS zátěže, abyste ji mohli uvést v dávkové úloze rozhraní API Livy ve Visual Studio Code, a také název tabulky Lakehouse v příkazu SQL SELECT.

    Snímek obrazovky znázorňující buňku datové části Pythonu

  3. Nahrajte datovou část Pythonu do části soubory vašeho Lakehouse. >Načíst soubory > nahrání dat > klikněte do pole Soubory nebo vstup.

    Snímek obrazovky znázorňující datovou část v části Soubory v Lakehouse

  4. Jakmile je soubor v části Soubory vašeho Lakehouse, klikněte na tři tečky napravo od názvu souboru datové části a vyberte Vlastnosti.

    Snímek obrazovky znázorňující cestu ABFS datové části v vlastnosti souboru v Lakehouse

  5. Zkopírujte tuto cestu ABFS do buňky poznámkového bloku v kroku 1.

Vytvoření dávkové relace sparkového rozhraní API Livy

  1. Vytvořte poznámkový blok v editoru .ipynb Visual Studio Code a vložte následující kód.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Spusťte buňku poznámkového bloku, měla by se v prohlížeči zobrazit automaticky otevíraná nabídka, která vám umožní zvolit identitu pro přihlášení.

    Snímek obrazovky s přihlašovací obrazovkou k aplikaci Microsoft Entra

  3. Jakmile zvolíte identitu pro přihlášení, budete také požádáni o schválení oprávnění rozhraní API pro registraci aplikací Microsoft Entra.

    Snímek obrazovky s oprávněními rozhraní API aplikace Microsoft Entra

  4. Po dokončení ověřování zavřete okno prohlížeče.

    Snímek obrazovky znázorňující dokončení ověřování

  5. V editoru Visual Studio Code byste měli vidět vrácený token Microsoft Entra.

    Snímek obrazovky znázorňující token Microsoft Entra vrácený po spuštění buňky a přihlášení

  6. Přidejte další buňku poznámkového bloku a vložte tento kód.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Spusťte buňku poznámkového bloku. Při vytváření dávkové úlohy Livy by se měly zobrazit dva řádky.

    Snímek obrazovky znázorňující výsledky vytvoření dávkové relace

Odeslání příkazu spark.sql pomocí dávkové relace rozhraní Livy API

  1. Přidejte další buňku poznámkového bloku a vložte tento kód.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Spusťte buňku poznámkového bloku. Při vytváření a spuštění úlohy Livy Batch by se mělo zobrazit několik řádků.

    Snímek obrazovky zobrazující výsledky v editoru Visual Studio Code po úspěšném odeslání dávkové úlohy Livy

  3. Vraťte se do svého Lakehouse a zobrazte změny.

Zobrazení úloh v centru monitorování

K centru monitorování se dostanete tak, že vyberete Možnost Monitorování v navigačních odkazech na levé straně a zobrazíte různé aktivity Apache Sparku.

  1. Po dokončení dávkové úlohy můžete stav relace zobrazit tak, že přejdete na Monitorování.

    Snímek obrazovky znázorňující předchozí odeslání rozhraní Api Livy v centru monitorování

  2. Vyberte a otevřete název poslední aktivity.

    Snímek obrazovky zobrazující nejnovější aktivitu rozhraní Livy API v centru monitorování

  3. V tomto případě relace rozhraní Livy API uvidíte předchozí dávkové odeslání, podrobnosti o spuštění, verze Sparku a konfiguraci. Všimněte si stavu zastavení v pravém horním rohu.

    snímek obrazovky s nejnovějšími podrobnostmi o aktivitě rozhraní Livy API v centru monitorování

K rekapitulace celého procesu potřebujete vzdáleného klienta, jako je Visual Studio Code, token aplikace Microsoft Entra, adresa URL koncového bodu rozhraní Livy API, ověřování vůči vaší službě Lakehouse, datová část Sparku ve vašem Lakehouse a nakonec dávková relace rozhraní Livy API.