Udostępnij za pośrednictwem


Przesyłanie i wykonywanie zadań wsadowych usługi Livy przy użyciu interfejsu API usługi Livy

Uwaga

Interfejs API usługi Livy dla usługi Fabric inżynierowie danych jest w wersji zapoznawczej.

Dotyczy:✅ inżynierowie danych i Nauka o danych w usłudze Microsoft Fabric

Przesyłanie zadań wsadowych platformy Spark przy użyciu interfejsu API usługi Livy for Fabric inżynierowie danych ing.

Wymagania wstępne

Interfejs API usługi Livy definiuje ujednolicony punkt końcowy dla operacji. Zastąp symbole zastępcze {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} i {Fabric_LakehouseID} odpowiednimi wartościami, korzystając z przykładów w tym artykule.

Konfigurowanie programu Visual Studio Code dla usługi Livy API Batch

  1. Wybierz pozycję Lakehouse Settings (Ustawienia usługi Lakehouse) w usłudze Fabric Lakehouse.

    Zrzut ekranu przedstawiający ustawienia usługi Lakehouse.

  2. Przejdź do sekcji Punkt końcowy usługi Livy.

    Zrzut ekranu przedstawiający parametry połączenia punktu końcowego usługi Lakehouse Livy i zadania sesji.

  3. Skopiuj zadanie usługi Batch parametry połączenia (drugie czerwone pole na obrazie) do kodu.

  4. Przejdź do centrum administracyjnego firmy Microsoft Entra i skopiuj identyfikator aplikacji (klienta) i identyfikator katalogu (dzierżawy) do kodu.

    Zrzut ekranu przedstawiający przegląd aplikacji interfejsu API usługi Livy w centrum administracyjnym firmy Microsoft Entra.

Tworzenie ładunku platformy Spark i przekazywanie go do usługi Lakehouse

  1. Tworzenie notesu .ipynb w programie Visual Studio Code i wstawianie następującego kodu

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Zapisz plik w języku Python lokalnie. Ten ładunek kodu w języku Python zawiera dwie instrukcje platformy Spark, które działają na danych w usłudze Lakehouse i muszą zostać przekazane do usługi Lakehouse. Ścieżka ABFS danych będzie potrzebna do referencji w zadaniu przetwarzania wsadowego API Livy w programie Visual Studio Code oraz nazwa tabeli Lakehouse w instrukcji SQL SELECT.

    Zrzut ekranu przedstawiający komórkę ładunku języka Python.

  3. Przekaż ładunek języka Python do sekcji plików usługi Lakehouse. > Pobierz dane > Przekaż pliki > kliknij w polu Pliki/dane wejściowe.

    Zrzut ekranu przedstawiający ładunek w sekcji Pliki w usłudze Lakehouse.

  4. Gdy plik znajduje się w sekcji Pliki usługi Lakehouse, kliknij trzy kropki po prawej stronie nazwy pliku ładunku i wybierz pozycję Właściwości.

    Zrzut ekranu przedstawiający ścieżkę ładunku ABFS we właściwościach pliku w usłudze Lakehouse.

  5. Skopiuj tę ścieżkę ABFS do komórki notesu w kroku 1.

Tworzenie sesji wsadowej interfejsu API usługi Livy Spark

  1. .ipynb Utwórz notes w programie Visual Studio Code i wstaw następujący kod.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Uruchom komórkę notesu. W przeglądarce powinno pojawić się okno podręczne umożliwiające wybranie tożsamości do zalogowania się.

    Zrzut ekranu przedstawiający ekran logowania do aplikacji Microsoft Entra.

  3. Po wybraniu tożsamości do logowania się zostanie również wyświetlony monit o zatwierdzenie uprawnień interfejsu API rejestracji aplikacji Firmy Microsoft Entra.

    Zrzut ekranu przedstawiający uprawnienia interfejsu API aplikacji Entra firmy Microsoft.

  4. Zamknij okno przeglądarki po zakończeniu uwierzytelniania.

    Zrzut ekranu przedstawiający ukończone uwierzytelnianie.

  5. W programie Visual Studio Code powinien zostać zwrócony token Entra firmy Microsoft.

    Zrzut ekranu przedstawiający token Entra firmy Microsoft zwrócony po uruchomieniu komórki i zalogowaniu.

  6. Dodaj kolejną komórkę notesu i wstaw ten kod.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Uruchom komórkę notesu. Powinny zostać wyświetlone dwa wiersze wydrukowane podczas tworzenia zadania wsadowego usługi Livy.

    Zrzut ekranu przedstawiający wyniki tworzenia sesji wsadowej.

Przesyłanie instrukcji spark.sql przy użyciu sesji wsadowej interfejsu API usługi Livy

  1. Dodaj kolejną komórkę notesu i wstaw ten kod.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Uruchom komórkę notesu. Po utworzeniu i uruchomieniu zadania usługi Livy Batch powinno zostać wyświetlonych kilka wierszy.

    Zrzut ekranu przedstawiający wyniki w programie Visual Studio Code po pomyślnym przesłaniu zadania usługi Livy Batch.

  3. Wróć do usługi Lakehouse, aby zobaczyć zmiany.

Wyświetlanie zadań w centrum monitorowania

Aby wyświetlić różne działania platformy Apache Spark, możesz uzyskać dostęp do centrum monitorowania, wybierając pozycję Monitoruj w linkach nawigacji po lewej stronie.

  1. Po zakończeniu zadania wsadowego możesz wyświetlić stan sesji, przechodząc do pozycji Monitor.

    Zrzut ekranu przedstawiający poprzednie przesyłania interfejsu API usługi Livy w centrum monitorowania.

  2. Wybierz i otwórz najnowszą nazwę działania.

    Zrzut ekranu przedstawiający najnowsze działanie interfejsu API usługi Livy w centrum monitorowania.

  3. W tym przypadku sesji interfejsu API usługi Livy możesz zobaczyć poprzednie przesyłanie wsadowe, szczegóły uruchomienia, wersje platformy Spark i konfigurację. Zwróć uwagę na zatrzymany stan w prawym górnym rogu.

    Zrzut ekranu przedstawiający najnowsze szczegóły działania interfejsu API usługi Livy w centrum monitorowania.

Aby podsumować cały proces, potrzebny jest klient zdalny, taki jak Visual Studio Code, token aplikacji Microsoft Entra, adres URL punktu końcowego interfejsu API usługi Livy, uwierzytelnianie względem usługi Lakehouse, ładunek spark w usłudze Lakehouse, a na koniec sesja interfejsu API usługi Livy w usłudze Batch.