Del via


Brug Livy-API'en til at sende og udføre Livy-batchjob

Bemærk

Livy API til Fabric Dataudvikler ing er en prøveversion.

Gælder for:✅ Dataudvikler ing og datavidenskab i Microsoft Fabric

Indsend Spark-batchjob ved hjælp af Livy-API'en til Fabric Dataudvikler ing.

Forudsætninger

Livy-API'en definerer et samlet slutpunkt for handlinger. Erstat pladsholderne {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} og {Fabric_LakehouseID} med de relevante værdier, når du følger eksemplerne i denne artikel.

Konfigurer Visual Studio Code for dit Livy API-batch

  1. Vælg Lakehouse-indstillinger i Fabric Lakehouse.

    Skærmbillede, der viser indstillingerne for Lakehouse.

  2. Gå til sektionen Livy-slutpunkt .

    skærmbillede, der viser Lakehouse Livy-slutpunktet og sessionsjobbet forbindelsesstreng.

  3. Kopiér batchjobbet forbindelsesstreng (det andet røde felt på billedet) til din kode.

  4. Gå til Microsoft Entra Administration , og kopiér både program-id'et (klient)-id'et og mappe-id'et (lejer) til din kode.

    Skærmbillede, der viser oversigt over Livy API-app i Microsoft Entra Administration.

Opret en Spark-nyttedata, og upload den til lakehouse

  1. Opret en .ipynb notesbog i Visual Studio Code, og indsæt følgende kode

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Gem Python-filen lokalt. Denne Python-kodedata indeholder to Spark-sætninger, der fungerer på data i et Lakehouse og skal uploades til dit Lakehouse. Du skal bruge ABFS-stien for nyttedataene for at referere til den i dit Livy API-batchjob i Visual Studio Code og navnet på tabellen Lakehouse i Select SQL-sætningen.

    Skærmbillede, der viser Python-nyttedatacellen.

  3. Upload Python-nyttedataene til filafsnittet i lakehouse. >Hent filer til overførsel af > data > klik i feltet Filer/input.

    Skærmbillede, der viser nyttedata i afsnittet Filer i Lakehouse.

  4. Når filen er i afsnittet Filer i Lakehouse, skal du klikke på de tre prikker til højre for dit payloadfilnavn og vælge Egenskaber.

    Skærmbillede, der viser ABFS-stien til nyttedata i egenskaberne for filen i Lakehouse.

  5. Kopiér denne ABFS-sti til din notesbogcelle i trin 1.

Opret en Livy API Spark-batchsession

  1. Opret en .ipynb notesbog i Visual Studio Code, og indsæt følgende kode.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Kør notesbogcellen. Der vises et pop op-vindue i browseren, så du kan vælge den identitet, du vil logge på med.

    Skærmbillede, der viser logonskærmen til Microsoft Entra-appen.

  3. Når du har valgt den identitet, du vil logge på med, bliver du også bedt om at godkende API-tilladelserne til Microsoft Entra-appregistrering.

    Skærmbillede, der viser API-tilladelser til Microsoft Entra-appen.

  4. Luk browservinduet, når godkendelsen er fuldført.

    Skærmbillede, der viser, at godkendelsen er fuldført.

  5. I Visual Studio Code kan du se, at Microsoft Entra-tokenet returneres.

    Skærmbillede, der viser Det Microsoft Entra-token, der returneres efter at have kørt celle og logget på.

  6. Tilføj en anden notesbogcelle, og indsæt denne kode.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Kør notesbogcellen. Du kan se to linjer udskrevet, når livy-batchjobbet oprettes.

    Skærmbillede, der viser resultaterne af oprettelsen af batchsessionen.

Send en spark.sql-sætning ved hjælp af Livy API-batchsessionen

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Kør notesbogcellen. Du kan se flere linjer udskrevet, når Livy-batchjobbet oprettes og køres.

    Skærmbillede, der viser resultater i Visual Studio Code, når Livy-batchjobbet er sendt.

  3. Gå tilbage til lakehouse for at se ændringerne.

Få vist dine job i overvågningshubben

Du kan få adgang til overvågningshubben for at få vist forskellige Apache Spark-aktiviteter ved at vælge Overvåg i navigationslinkene til venstre.

  1. Når batchjobbet er fuldført, kan du få vist sessionsstatussen ved at gå til Overvågning.

    Skærmbillede, der viser tidligere Livy API-indsendelser i overvågningshubben.

  2. Vælg og åbn det seneste aktivitetsnavn.

    Skærmbillede, der viser den seneste Livy API-aktivitet i overvågningshubben.

  3. I denne Livy API-session kan du se din tidligere batchafsendelse, køre detaljer, Spark-versioner og konfiguration. Læg mærke til den stoppet status øverst til højre.

    Skærmbillede, der viser de seneste Livy API-aktivitetsoplysninger i overvågningshubben.

Hvis du vil opsummere hele processen, skal du bruge en fjernklient, f.eks . Visual Studio Code, et Microsoft Entra-app-token, URL-adressen til Livy API-slutpunktet, godkendelse mod lakehouse, en Spark-nyttedata i dit Lakehouse og endelig en batch Livy-API-session.