Udostępnij za pośrednictwem


Przesyłanie i wykonywanie zadań sesji przy użyciu interfejsu API usługi Livy

Uwaga

Interfejs API usługi Livy dla usługi Fabric inżynierowie danych jest w wersji zapoznawczej.

Dotyczy:✅ inżynierowie danych i Nauka o danych w usłudze Microsoft Fabric

Przesyłanie zadań wsadowych platformy Spark przy użyciu interfejsu API usługi Livy for Fabric inżynierowie danych ing.

Wymagania wstępne

Interfejs API usługi Livy definiuje ujednolicony punkt końcowy dla operacji. Zastąp symbole zastępcze {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} i {Fabric_LakehouseID} odpowiednimi wartościami, korzystając z przykładów w tym artykule.

Konfigurowanie programu Visual Studio Code dla sesji interfejsu API usługi Livy

  1. Wybierz pozycję Lakehouse Settings (Ustawienia usługi Lakehouse) w usłudze Fabric Lakehouse.

    Zrzut ekranu przedstawiający ustawienia usługi Lakehouse.

  2. Przejdź do sekcji Punkt końcowy usługi Livy.

    Zrzut ekranu przedstawiający parametry połączenia punktu końcowego usługi Lakehouse Livy i zadania sesji.

  3. Skopiuj zadanie sesji parametry połączenia (pierwsze czerwone pole na obrazie) do kodu.

  4. Przejdź do centrum administracyjnego firmy Microsoft Entra i skopiuj identyfikator aplikacji (klienta) i identyfikator katalogu (dzierżawy) do kodu.

    Zrzut ekranu przedstawiający przegląd aplikacji interfejsu API usługi Livy w centrum administracyjnym firmy Microsoft Entra.

Tworzenie sesji platformy Spark interfejsu API usługi Livy

  1. .ipynb Utwórz notes w programie Visual Studio Code i wstaw następujący kod.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Uruchom komórkę notesu. W przeglądarce powinno pojawić się okno podręczne umożliwiające wybranie tożsamości do zalogowania się.

    Zrzut ekranu przedstawiający ekran logowania do aplikacji Microsoft Entra.

  3. Po wybraniu tożsamości do logowania się zostanie również wyświetlony monit o zatwierdzenie uprawnień interfejsu API rejestracji aplikacji Firmy Microsoft Entra.

    Zrzut ekranu przedstawiający uprawnienia interfejsu API aplikacji Entra firmy Microsoft.

  4. Zamknij okno przeglądarki po zakończeniu uwierzytelniania.

    Zrzut ekranu przedstawiający ukończone uwierzytelnianie.

  5. W programie Visual Studio Code powinien zostać zwrócony token Entra firmy Microsoft.

    Zrzut ekranu przedstawiający token Entra firmy Microsoft zwrócony po uruchomieniu komórki i zalogowaniu.

  6. Dodaj kolejną komórkę notesu i wstaw ten kod.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Uruchom komórkę notesu. Powinien zostać wyświetlony jeden wiersz wydrukowany podczas tworzenia sesji usługi Livy.

    Zrzut ekranu przedstawiający wyniki pierwszego wykonania komórki notesu.

  8. Możesz sprawdzić, czy sesja usługi Livy została utworzona przy użyciu pozycji [Wyświetl zadania w centrum monitorowania](#View zadania w centrum monitorowania).

Przesyłanie instrukcji spark.sql przy użyciu sesji usługi Livy API Spark

  1. Dodaj kolejną komórkę notesu i wstaw ten kod.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Uruchom komórkę notesu. Powinno zostać wyświetlonych kilka wierszy przyrostowych wydrukowanych podczas przesyłania zadania i zwracanych wyników.

    Zrzut ekranu przedstawiający wyniki pierwszej komórki notesu z Spark.sql wykonaniem.

Prześlij drugą instrukcję spark.sql przy użyciu sesji usługi Livy API Spark

  1. Dodaj kolejną komórkę notesu i wstaw ten kod.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Uruchom komórkę notesu. Powinno zostać wyświetlonych kilka wierszy przyrostowych wydrukowanych podczas przesyłania zadania i zwracanych wyników.

    Zrzut ekranu przedstawiający wyniki wykonania drugiej komórki notesu.

Zamknij sesję usługi Livy za pomocą trzeciej instrukcji

  1. Dodaj kolejną komórkę notesu i wstaw ten kod.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Wyświetlanie zadań w centrum monitorowania

Aby wyświetlić różne działania platformy Apache Spark, możesz uzyskać dostęp do centrum monitorowania, wybierając pozycję Monitoruj w linkach nawigacji po lewej stronie.

  1. Gdy sesja jest w toku lub jest w stanie ukończonym, możesz wyświetlić stan sesji, przechodząc do pozycji Monitor.

    Zrzut ekranu przedstawiający poprzednie przesyłania interfejsu API usługi Livy w centrum monitorowania.

  2. Wybierz i otwórz najnowszą nazwę działania.

    Zrzut ekranu przedstawiający najnowsze działanie interfejsu API usługi Livy w centrum monitorowania.

  3. W tym przypadku sesji interfejsu API usługi Livy można wyświetlić poprzednie przesłania sesji, szczegóły uruchomienia, wersje platformy Spark i konfigurację. Zwróć uwagę na zatrzymany stan w prawym górnym rogu.

    Zrzut ekranu przedstawiający najnowsze szczegóły działania interfejsu API usługi Livy w centrum monitorowania.

Aby podsumować cały proces, potrzebny jest klient zdalny, taki jak Visual Studio Code, token aplikacji Microsoft Entra, adres URL punktu końcowego interfejsu API usługi Livy, uwierzytelnianie względem usługi Lakehouse, a na koniec interfejs API usługi Livy sesji.