Utiliser l’API Livy pour envoyer et exécuter des travaux de session
Remarque
L’API Livy pour Ingénieurs de données Fabric est en préversion.
S’applique à :✅ l’engineering et la science des données dans Microsoft Fabric
Envoyez des travaux de traitement par lots Spark à l’aide de l’API Livy pour Ingénieurs de données Fabric.
Prérequis
Capacité Fabric Premium ou d’évaluation avec un lakehouse.
Un client distant tel que Visual Studio Code avec Jupyter Notebooks, PySpark et la bibliothèque d’authentification Microsoft (MSAL) pour Python.
Un jeton d’app Microsoft Entra est nécessaire pour accéder à l’API REST Fabric. Inscrire une application à l’aide de la plateforme d’identités Microsoft.
Certaines données dans votre lakehouse. Cet exemple utilise NYC Taxi & Limousine Commission green_tripdata_2022_08. Un fichier parquet chargé dans le lakehouse.
L’API Livy définit un point de terminaison unifié pour les opérations. Remplacez les espaces réservés {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, et {Fabric_LakehouseID} par vos valeurs appropriées lorsque vous suivez les exemples de cet article.
Configurer Visual Studio Code pour votre session d’API Livy
Sélectionnez Paramètres Lakehouse dans votre lakehouse Fabric.
Accédez à la section Point de terminaison Livy.
Copiez la chaîne de connexion du travail de session (première zone rouge dans l’image) dans votre code.
Accédez au centre d’administration Microsoft Entra et copiez l’ID d’application (client) et l’ID d’annuaire (locataire) dans votre code.
Créer une session Spark d’API Livy
Créez un notebook
.ipynb
dans Visual Studio Code et insérez le code suivant.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Exécutez la cellule de notebook. Une fenêtre contextuelle doit apparaître dans votre navigateur, ce qui vous permet de choisir l’identité avec laquelle vous connecter.
Une fois que vous avez choisi l’identité à utiliser pour vous connecter, vous êtes également invité à approuver les autorisations de l’API d’inscription d’application Microsoft Entra.
Fermez la fenêtre du navigateur après avoir procédé à l’authentification.
Dans Visual Studio Code, vous devez voir le jeton Microsoft Entra retourné.
Ajoutez une autre cellule de notebook et insérez ce code.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Exécutez la cellule de notebook. Vous devez voir une ligne imprimée lorsque la session Livy est créée.
Vous pouvez vérifier que la session Livy est créée à l’aide de [Afficher vos travaux dans le hub de supervision](#Afficher vos travaux dans le hub de supervision).
Envoyer une instruction spark.sql à l’aide de la session Spark de l’API Livy
Ajoutez une autre cellule de notebook et insérez ce code.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Exécutez la cellule de notebook. Vous devez voir plusieurs lignes incrémentielles imprimées à mesure que le travail est envoyé et les résultats retournés.
Envoyer une deuxième instruction spark.sql à l’aide de la session Spark de l’API Livy
Ajoutez une autre cellule de notebook et insérez ce code.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Exécutez la cellule de notebook. Vous devez voir plusieurs lignes incrémentielles imprimées à mesure que le travail est envoyé et les résultats retournés.
Fermer la session Livy avec une troisième instruction
Ajoutez une autre cellule de notebook et insérez ce code.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Afficher vos travaux dans le hub de supervision
Vous pouvez accéder au hub de supervision pour afficher diverses activités Apache Spark en sélectionnant Superviser dans les liens de navigation de gauche.
Lorsque la session est en cours ou à l’état terminé, vous pouvez afficher l’état de la session en accédant à Superviser.
Sélectionnez et ouvrez le nom de l’activité la plus récente.
Dans ce cas de session API Livy, vous pouvez voir vos envois de sessions précédents, les détails d’exécution, les versions Spark et la configuration. Notez l’état Arrêté en haut à droite.
Pour récapituler l’ensemble du processus, vous avez besoin d’un client distant tel que Visual Studio Code, d’un jeton d’application Microsoft Entra, de l’URL du point de terminaison de l’API Livy, de l’authentification auprès de votre Lakehouse, et enfin d’une session d’API Livy.