Uso de Livy API para enviar y ejecutar trabajos de sesión
Nota:
Livy API para Ingeniería de datos de Fabric está en versión preliminar.
Se aplica a:✅ Ingeniería de datos y ciencia de datos en Microsoft Fabric
Envíe trabajos por lotes de Spark mediante Livy API para Ingeniería de datos de Fabric.
Requisitos previos
Fabric Premium o Capacidad de prueba con una instancia de almacén de lago de datos.
Un cliente remoto, como Visual Studio Code con cuadernos de Jupyter Notebook, PySpark y la biblioteca de autenticación de Microsoft (MSAL) para Python.
Se requiere un token de aplicación de Microsoft Entra para acceder a la API de REST de Fabric. Registro de una aplicación en la Plataforma de identidad de Microsoft.
Algunos datos de su instancia de almacén de lago de datos, en este ejemplo se usa NYC Taxi & Limousine Commission green_tripdata_2022_08 un archivo parquet cargado en el almacén de lago de datos.
Livy API define un punto de conexión unificado para las operaciones. Reemplace los marcadores de posición {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}y {Fabric_LakehouseID} por los valores adecuados al seguir los ejemplos de este artículo.
Configuración de Visual Studio Code para la sesión de Livy API
Seleccione Configuración de almacén de lago de datos en el almacén de lago de datos de Fabric.
Vaya a la sección punto de conexión de Livy.
Copie la cadena de conexión del trabajo de sesión (primer cuadro rojo de la imagen) en el código.
Vaya a Centro de administración de Microsoft Entra y copie el identificador de aplicación (cliente) y el identificador de directorio (inquilino) en el código.
Creación de una sesión de Spark de Livy API
Cree un cuaderno
.ipynb
en Visual Studio Code e inserte el siguiente código.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Ejecute la celda del cuaderno; debería aparecer un elemento emergente en el explorador, lo que le permite elegir la identidad con la que iniciar sesión.
Después de elegir la identidad con la que iniciar sesión, también se le pedirá que apruebe los permisos de la API de registro de aplicaciones de Microsoft Entra.
Cierre la ventana del explorador después de completar la autenticación.
En Visual Studio Code, debería ver el token de Microsoft Entra devuelto.
Agregue otra celda del cuaderno e inserte este código.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Ejecute la celda del cuaderno; debería ver una línea impresa a medida que se crea la sesión de Livy.
Puede comprobar que la sesión de Livy se crea mediante [Ver los trabajos en el centro de supervisión](#Ver los trabajos en el centro de supervisión).
Envío de una instrucción spark.sql mediante la sesión de Spark de Livy API
Agregue otra celda del cuaderno e inserte este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Ejecute la celda del cuaderno; debería ver varias líneas incrementales impresas a medida que se envía el trabajo y se devuelven los resultados.
Envío de una segunda instrucción spark.sql mediante la sesión de Spark de Livy API
Agregue otra celda del cuaderno e inserte este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Ejecute la celda del cuaderno; debería ver varias líneas incrementales impresas a medida que se envía el trabajo y se devuelven los resultados.
Cierre de la sesión de Livy con una tercera instrucción
Agregue otra celda del cuaderno e inserte este código.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Visualización de los trabajos en el centro de supervisión
Puede acceder al centro de supervisión para ver varias actividades de Apache Spark al seleccionar Supervisar en los vínculos de navegación del lado izquierdo.
Cuando la sesión está en curso o en estado completado, puede ver el estado de la sesión; para ello, vaya Supervisar.
Seleccione y abra el nombre de la actividad más reciente.
En este caso de sesión de Livy API, puede ver los envíos de sesiones anteriores, los detalles de ejecución, las versiones de Spark y la configuración. Observe el estado detenido en la parte superior derecha.
Para resumir todo el proceso, necesita un cliente remoto, como Visual Studio Code, un token de aplicación de Microsoft Entra, una dirección URL del punto de conexión de Livy API, la autenticación en el almacén de lago de datos y, por último, una sesión de Livy API.