Partager via


Utiliser l’API Livy pour envoyer et exécuter des travaux de session

Remarque

L’API Livy pour Ingénieurs de données Fabric est en préversion.

S’applique à :✅ l’engineering et la science des données dans Microsoft Fabric

Envoyez des travaux de traitement par lots Spark à l’aide de l’API Livy pour Ingénieurs de données Fabric.

Prérequis

L’API Livy définit un point de terminaison unifié pour les opérations. Remplacez les espaces réservés {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, et {Fabric_LakehouseID} par vos valeurs appropriées lorsque vous suivez les exemples de cet article.

Configurer Visual Studio Code pour votre session d’API Livy

  1. Sélectionnez Paramètres Lakehouse dans votre lakehouse Fabric.

    Capture d’écran montrant les paramètres de lakehouse.

  2. Accédez à la section Point de terminaison Livy.

    Capture d’écran montrant le point de terminaison de lakehouse Livy et la chaîne de connexion de travail de session.

  3. Copiez la chaîne de connexion du travail de session (première zone rouge dans l’image) dans votre code.

  4. Accédez au centre d’administration Microsoft Entra et copiez l’ID d’application (client) et l’ID d’annuaire (locataire) dans votre code.

    Capture d’écran montrant la vue d’ensemble de l’application d’API Livy dans le centre d’administration Microsoft Entra.

Créer une session Spark d’API Livy

  1. Créez un notebook .ipynb dans Visual Studio Code et insérez le code suivant.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Exécutez la cellule de notebook. Une fenêtre contextuelle doit apparaître dans votre navigateur, ce qui vous permet de choisir l’identité avec laquelle vous connecter.

    Capture d’écran montrant l’écran de connexion à l’application Microsoft Entra.

  3. Une fois que vous avez choisi l’identité à utiliser pour vous connecter, vous êtes également invité à approuver les autorisations de l’API d’inscription d’application Microsoft Entra.

    Capture d’écran montrant les autorisations de l’API d’application Microsoft Entra.

  4. Fermez la fenêtre du navigateur après avoir procédé à l’authentification.

    Capture d’écran montrant l’authentification réussie.

  5. Dans Visual Studio Code, vous devez voir le jeton Microsoft Entra retourné.

    Capture d’écran montrant le jeton Microsoft Entra retourné après l’exécution de la cellule et la connexion.

  6. Ajoutez une autre cellule de notebook et insérez ce code.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Exécutez la cellule de notebook. Vous devez voir une ligne imprimée lorsque la session Livy est créée.

    Capture d’écran montrant les résultats de la première exécution de cellule de notebook.

  8. Vous pouvez vérifier que la session Livy est créée à l’aide de [Afficher vos travaux dans le hub de supervision](#Afficher vos travaux dans le hub de supervision).

Envoyer une instruction spark.sql à l’aide de la session Spark de l’API Livy

  1. Ajoutez une autre cellule de notebook et insérez ce code.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Exécutez la cellule de notebook. Vous devez voir plusieurs lignes incrémentielles imprimées à mesure que le travail est envoyé et les résultats retournés.

    Capture d’écran montrant les résultats de la première cellule de notebook avec exécution Spark.sql.

Envoyer une deuxième instruction spark.sql à l’aide de la session Spark de l’API Livy

  1. Ajoutez une autre cellule de notebook et insérez ce code.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Exécutez la cellule de notebook. Vous devez voir plusieurs lignes incrémentielles imprimées à mesure que le travail est envoyé et les résultats retournés.

    Capture d’écran montrant les résultats de la deuxième exécution de cellule de notebook.

Fermer la session Livy avec une troisième instruction

  1. Ajoutez une autre cellule de notebook et insérez ce code.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Afficher vos travaux dans le hub de supervision

Vous pouvez accéder au hub de supervision pour afficher diverses activités Apache Spark en sélectionnant Superviser dans les liens de navigation de gauche.

  1. Lorsque la session est en cours ou à l’état terminé, vous pouvez afficher l’état de la session en accédant à Superviser.

    Capture d’écran montrant les envois d’API Livy précédents dans le hub de supervision.

  2. Sélectionnez et ouvrez le nom de l’activité la plus récente.

    Capture d’écran montrant l’activité la plus récente de l’API Livy dans le hub de supervision.

  3. Dans ce cas de session API Livy, vous pouvez voir vos envois de sessions précédents, les détails d’exécution, les versions Spark et la configuration. Notez l’état Arrêté en haut à droite.

    Capture d’écran montrant les détails de l’activité la plus récente de l’API Livy dans le hub de supervision.

Pour récapituler l’ensemble du processus, vous avez besoin d’un client distant tel que Visual Studio Code, d’un jeton d’application Microsoft Entra, de l’URL du point de terminaison de l’API Livy, de l’authentification auprès de votre Lakehouse, et enfin d’une session d’API Livy.