Delen via


De Livy-API gebruiken om Livy-batchtaken te verzenden en uit te voeren

Notitie

De Livy-API voor Fabric Data-engineer ing is in preview.

Van toepassing op:✅ Data-engineer ing en Datawetenschap in Microsoft Fabric

Verzend Spark-batchtaken met behulp van de Livy-API voor Fabric Data-engineer ing.

Vereisten

De Livy-API definieert een uniform eindpunt voor bewerkingen. Vervang de tijdelijke aanduidingen {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} en {Fabric_LakehouseID} door de juiste waarden wanneer u de voorbeelden in dit artikel volgt.

Visual Studio Code configureren voor uw Livy API Batch

  1. Selecteer Lakehouse-instellingen in uw Fabric Lakehouse.

    Schermopname van Lakehouse-instellingen.

  2. Navigeer naar de sectie Livy-eindpunt .

    schermopname van het Lakehouse Livy-eindpunt en de sessietaak verbindingsreeks.

  3. Kopieer de Batch-taak verbindingsreeks (tweede rood vak in de afbeelding) naar uw code.

  4. Navigeer naar het Microsoft Entra-beheercentrum en kopieer zowel de toepassings-id (client-id) als de map-id (tenant) naar uw code.

    Schermopname van het overzicht van de Livy-API-app in het Microsoft Entra-beheercentrum.

Een Spark-nettolading maken en uploaden naar uw Lakehouse

  1. .ipynb Een notebook maken in Visual Studio Code en de volgende code invoegen

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Sla het Python-bestand lokaal op. Deze Python-codepayload bevat twee Spark-instructies die werken aan gegevens in een Lakehouse en moeten worden geüpload naar uw Lakehouse. U hebt het ABFS-pad van de payload nodig om te verwijzen naar in uw Livy API-batchtaak in Visual Studio Code, en de naam van uw Lakehouse-tabel in de SQL SELECT-instructie.

    Schermopname van de Python-nettoladingcel.

  3. Upload de Python-nettolading naar de bestandssectie van uw Lakehouse. > Download bestanden > uploaden > klik in het vak Bestanden/invoer.

    Schermopname van nettolading in de sectie Bestanden van Lakehouse.

  4. Nadat het bestand zich in de sectie Bestanden van uw Lakehouse bevindt, klikt u op de drie puntjes rechts van de bestandsnaam van de nettolading en selecteert u Eigenschappen.

    Schermopname van het ABFS-pad nettolading in de eigenschappen van het bestand in Lakehouse.

  5. Kopieer dit ABFS-pad naar uw notebookcel in stap 1.

Een Livy API Spark-batchsessie maken

  1. Maak een .ipynb notebook in Visual Studio Code en voeg de volgende code in.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Voer de notebookcel uit. Er moet een pop-up worden weergegeven in uw browser, zodat u de identiteit kunt kiezen waarmee u zich kunt aanmelden.

    Schermopname van het aanmeldingsscherm voor de Microsoft Entra-app.

  3. Nadat u de identiteit hebt gekozen waarmee u zich wilt aanmelden, wordt u ook gevraagd om de API-machtigingen voor de registratie-API voor Microsoft Entra-apps goed te keuren.

    Schermopname van API-machtigingen voor Microsoft Entra-apps.

  4. Sluit het browservenster nadat u de verificatie hebt voltooid.

    Schermopname van het voltooien van de verificatie.

  5. In Visual Studio Code zou het Microsoft Entra-token moeten worden geretourneerd.

    Schermopname van het Microsoft Entra-token dat is geretourneerd nadat u een cel hebt uitgevoerd en zich hebt aangemeld.

  6. Voeg nog een notebookcel toe en voeg deze code in.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Voer de notebookcel uit. U ziet dat er twee regels worden afgedrukt terwijl de Livy-batchtaak wordt gemaakt.

    Schermopname van de resultaten van het maken van een batchsessie.

Een spark.sql-instructie verzenden met behulp van de Livy API-batchsessie

  1. Voeg nog een notebookcel toe en voeg deze code in.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Voer de notebookcel uit. U ziet dat er verschillende regels worden afgedrukt terwijl de Livy Batch-taak wordt gemaakt en uitgevoerd.

    Schermopname van de resultaten in Visual Studio Code nadat de Livy Batch-taak is verzonden.

  3. Ga terug naar uw Lakehouse om de wijzigingen te bekijken.

Uw taken weergeven in de Bewakingshub

U hebt toegang tot de Bewakingshub om verschillende Apache Spark-activiteiten weer te geven door Monitor te selecteren in de navigatiekoppelingen aan de linkerkant.

  1. Wanneer de batchtaak is voltooid, kunt u de sessiestatus bekijken door naar Monitor te navigeren.

    Schermopname van eerdere Livy API-inzendingen in de Monitoring Hub.

  2. Selecteer en open de naam van de meest recente activiteit.

    Schermopname van de meest recente Livy-API-activiteit in de Bewakingshub.

  3. In dit geval van livy-API-sessie kunt u uw vorige batchinzending zien, details uitvoeren, Spark-versies en configuratie. Let op de gestopte status rechtsboven.

    Schermopname met de meest recente details van de Livy-API-activiteit in de Bewakingshub.

Als u het hele proces wilt terugvatten, hebt u een externe client nodig, zoals Visual Studio Code, een Microsoft Entra-app-token, de URL van het Livy API-eindpunt, verificatie voor uw Lakehouse, een Spark-nettolading in uw Lakehouse en ten slotte een batch-Livy API-sessie.