Przesyłanie i wykonywanie zadań sesji przy użyciu interfejsu API usługi Livy
Uwaga
Interfejs API usługi Livy dla usługi Fabric inżynierowie danych jest w wersji zapoznawczej.
Dotyczy:✅ inżynierowie danych i Nauka o danych w usłudze Microsoft Fabric
Przesyłanie zadań wsadowych platformy Spark przy użyciu interfejsu API usługi Livy for Fabric inżynierowie danych ing.
Wymagania wstępne
Pojemność premium lub wersja próbna sieci szkieletowej z usługą Lakehouse.
Klient zdalny, taki jak Program Visual Studio Code z notesami Jupyter Notebook, PySpark i biblioteką Microsoft Authentication Library (MSAL) dla języka Python.
Token aplikacji Entra firmy Microsoft jest wymagany do uzyskania dostępu do interfejsu API REST sieci szkieletowej. Zarejestruj aplikację w Platforma tożsamości Microsoft.
Niektóre dane w lakehouse, w tym przykładzie użyto NYC Taxi & Limousine Commission green_tripdata_2022_08 plik parquet załadowany do jeziora.
Interfejs API usługi Livy definiuje ujednolicony punkt końcowy dla operacji. Zastąp symbole zastępcze {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} i {Fabric_LakehouseID} odpowiednimi wartościami, korzystając z przykładów w tym artykule.
Konfigurowanie programu Visual Studio Code dla sesji interfejsu API usługi Livy
Wybierz pozycję Lakehouse Settings (Ustawienia usługi Lakehouse) w usłudze Fabric Lakehouse.
Przejdź do sekcji Punkt końcowy usługi Livy.
Skopiuj zadanie sesji parametry połączenia (pierwsze czerwone pole na obrazie) do kodu.
Przejdź do centrum administracyjnego firmy Microsoft Entra i skopiuj identyfikator aplikacji (klienta) i identyfikator katalogu (dzierżawy) do kodu.
Tworzenie sesji platformy Spark interfejsu API usługi Livy
.ipynb
Utwórz notes w programie Visual Studio Code i wstaw następujący kod.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Uruchom komórkę notesu. W przeglądarce powinno pojawić się okno podręczne umożliwiające wybranie tożsamości do zalogowania się.
Po wybraniu tożsamości do logowania się zostanie również wyświetlony monit o zatwierdzenie uprawnień interfejsu API rejestracji aplikacji Firmy Microsoft Entra.
Zamknij okno przeglądarki po zakończeniu uwierzytelniania.
W programie Visual Studio Code powinien zostać zwrócony token Entra firmy Microsoft.
Dodaj kolejną komórkę notesu i wstaw ten kod.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Uruchom komórkę notesu. Powinien zostać wyświetlony jeden wiersz wydrukowany podczas tworzenia sesji usługi Livy.
Możesz sprawdzić, czy sesja usługi Livy została utworzona przy użyciu pozycji [Wyświetl zadania w centrum monitorowania](#View zadania w centrum monitorowania).
Przesyłanie instrukcji spark.sql przy użyciu sesji usługi Livy API Spark
Dodaj kolejną komórkę notesu i wstaw ten kod.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Uruchom komórkę notesu. Powinno zostać wyświetlonych kilka wierszy przyrostowych wydrukowanych podczas przesyłania zadania i zwracanych wyników.
Prześlij drugą instrukcję spark.sql przy użyciu sesji usługi Livy API Spark
Dodaj kolejną komórkę notesu i wstaw ten kod.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Uruchom komórkę notesu. Powinno zostać wyświetlonych kilka wierszy przyrostowych wydrukowanych podczas przesyłania zadania i zwracanych wyników.
Zamknij sesję usługi Livy za pomocą trzeciej instrukcji
Dodaj kolejną komórkę notesu i wstaw ten kod.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Wyświetlanie zadań w centrum monitorowania
Aby wyświetlić różne działania platformy Apache Spark, możesz uzyskać dostęp do centrum monitorowania, wybierając pozycję Monitoruj w linkach nawigacji po lewej stronie.
Gdy sesja jest w toku lub jest w stanie ukończonym, możesz wyświetlić stan sesji, przechodząc do pozycji Monitor.
Wybierz i otwórz najnowszą nazwę działania.
W tym przypadku sesji interfejsu API usługi Livy można wyświetlić poprzednie przesłania sesji, szczegóły uruchomienia, wersje platformy Spark i konfigurację. Zwróć uwagę na zatrzymany stan w prawym górnym rogu.
Aby podsumować cały proces, potrzebny jest klient zdalny, taki jak Visual Studio Code, token aplikacji Microsoft Entra, adres URL punktu końcowego interfejsu API usługi Livy, uwierzytelnianie względem usługi Lakehouse, a na koniec interfejs API usługi Livy sesji.