Compartir a través de


Uso de Livy API para enviar y ejecutar trabajos de sesión

Nota:

Livy API para Ingeniería de datos de Fabric está en versión preliminar.

Se aplica a:✅ Ingeniería de datos y ciencia de datos en Microsoft Fabric

Envíe trabajos por lotes de Spark mediante Livy API para Ingeniería de datos de Fabric.

Requisitos previos

Livy API define un punto de conexión unificado para las operaciones. Reemplace los marcadores de posición {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}y {Fabric_LakehouseID} por los valores adecuados al seguir los ejemplos de este artículo.

Configuración de Visual Studio Code para la sesión de Livy API

  1. Seleccione Configuración de almacén de lago de datos en el almacén de lago de datos de Fabric.

    Recorte de pantalla que muestra la configuración del almacén de lago de datos.

  2. Vaya a la sección punto de conexión de Livy.

    Recorte de pantalla que muestra el punto de conexión del almacén de lago de datos de Livy y la cadena de conexión del trabajo de sesión.

  3. Copie la cadena de conexión del trabajo de sesión (primer cuadro rojo de la imagen) en el código.

  4. Vaya a Centro de administración de Microsoft Entra y copie el identificador de aplicación (cliente) y el identificador de directorio (inquilino) en el código.

    Captura de pantalla que muestra información general de la aplicación Livy API en el Centro de administración de Microsoft Entra.

Creación de una sesión de Spark de Livy API

  1. Cree un cuaderno .ipynb en Visual Studio Code e inserte el siguiente código.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Ejecute la celda del cuaderno; debería aparecer un elemento emergente en el explorador, lo que le permite elegir la identidad con la que iniciar sesión.

    Recorte de pantalla que muestra la pantalla de inicio de sesión en la aplicación Microsoft Entra.

  3. Después de elegir la identidad con la que iniciar sesión, también se le pedirá que apruebe los permisos de la API de registro de aplicaciones de Microsoft Entra.

    Recorte de pantalla que muestra los permisos de la API de aplicaciones de Microsoft Entra.

  4. Cierre la ventana del explorador después de completar la autenticación.

    Recorte de pantalla que muestra la autenticación completada.

  5. En Visual Studio Code, debería ver el token de Microsoft Entra devuelto.

    Recorte de pantalla que muestra el token de Microsoft Entra devuelto después de ejecutar la celda y iniciar sesión.

  6. Agregue otra celda del cuaderno e inserte este código.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Ejecute la celda del cuaderno; debería ver una línea impresa a medida que se crea la sesión de Livy.

    Recorte de pantalla que muestra los resultados de la primera ejecución de celdas del cuaderno.

  8. Puede comprobar que la sesión de Livy se crea mediante [Ver los trabajos en el centro de supervisión](#Ver los trabajos en el centro de supervisión).

Envío de una instrucción spark.sql mediante la sesión de Spark de Livy API

  1. Agregue otra celda del cuaderno e inserte este código.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Ejecute la celda del cuaderno; debería ver varias líneas incrementales impresas a medida que se envía el trabajo y se devuelven los resultados.

    Recorte de pantalla que muestra los resultados de la primera celda del cuaderno con Spark.sql ejecución.

Envío de una segunda instrucción spark.sql mediante la sesión de Spark de Livy API

  1. Agregue otra celda del cuaderno e inserte este código.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Ejecute la celda del cuaderno; debería ver varias líneas incrementales impresas a medida que se envía el trabajo y se devuelven los resultados.

    Recorte de pantalla que muestra los resultados de la segunda ejecución de celdas del cuaderno.

Cierre de la sesión de Livy con una tercera instrucción

  1. Agregue otra celda del cuaderno e inserte este código.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Visualización de los trabajos en el centro de supervisión

Puede acceder al centro de supervisión para ver varias actividades de Apache Spark al seleccionar Supervisar en los vínculos de navegación del lado izquierdo.

  1. Cuando la sesión está en curso o en estado completado, puede ver el estado de la sesión; para ello, vaya Supervisar.

    Recorte de pantalla que muestra los envíos anteriores de Livy API en el centro de supervisión.

  2. Seleccione y abra el nombre de la actividad más reciente.

    Recorte de pantalla que muestra la actividad de Livy API más reciente en el centro de supervisión.

  3. En este caso de sesión de Livy API, puede ver los envíos de sesiones anteriores, los detalles de ejecución, las versiones de Spark y la configuración. Observe el estado detenido en la parte superior derecha.

    Recorte de pantalla que muestra los detalles de la actividad de Livy API más reciente en el centro de supervisión.

Para resumir todo el proceso, necesita un cliente remoto, como Visual Studio Code, un token de aplicación de Microsoft Entra, una dirección URL del punto de conexión de Livy API, la autenticación en el almacén de lago de datos y, por último, una sesión de Livy API.