Use a API do Livy para enviar e executar trabalhos de sessão
Nota
A API Livy para engenharia de dados de malha está em visualização.
Aplica-se a:✅ Engenharia de Dados e Ciência de Dados no Microsoft Fabric
Envie trabalhos em lote do Spark usando a API do Livy para engenharia de dados de malha.
Pré-requisitos
Um cliente remoto, como o Visual Studio Code com Jupyter Notebooks, o PySpark e a Microsoft Authentication Library (MSAL) para Python.
Um token de aplicativo Microsoft Entra é necessário para acessar a API Fabric Rest. Registre um aplicativo com a plataforma de identidade da Microsoft.
Alguns dados em sua casa do lago, este exemplo usa NYC Taxi & Limousine Commission green_tripdata_2022_08 um arquivo de parquet carregado na casa do lago.
A API Livy define um ponto de extremidade unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos deste artigo.
Configurar o código do Visual Studio para sua sessão de API Livy
Selecione Lakehouse Settings no seu Fabric Lakehouse.
Navegue até a seção Livy endpoint .
Copie a cadeia de conexão do trabalho de sessão (primeira caixa vermelha na imagem) para o código.
Navegue até o Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.
Criar uma sessão do Livy API Spark
Crie um bloco de
.ipynb
anotações no Visual Studio Code e insira o código a seguir.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador permitindo que você escolha a identidade com a qual entrar.
Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.
Feche a janela do navegador depois de concluir a autenticação.
No Visual Studio Code, você verá o token do Microsoft Entra retornado.
Adicione outra célula do bloco de notas e insira este código.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Execute a célula do bloco de anotações, você verá uma linha impressa enquanto a sessão Livy é criada.
Você pode verificar se a sessão Livy foi criada usando o botão [Exibir seus trabalhos no hub de Monitoramento](#View seus trabalhos no hub de Monitoramento).
Enviar uma declaração de spark.sql usando a sessão Livy API Spark
Adicione outra célula do bloco de notas e insira este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Execute a célula do bloco de anotações, você verá várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.
Envie uma segunda declaração de spark.sql usando a sessão Livy API Spark
Adicione outra célula do bloco de notas e insira este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Execute a célula do bloco de anotações, você verá várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.
Encerre a sessão do Lívio com uma terceira declaração
Adicione outra célula do bloco de notas e insira este código.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Exibir seus trabalhos no hub de monitoramento
Você pode acessar o hub de monitoramento para visualizar várias atividades do Apache Spark selecionando Monitor nos links de navegação do lado esquerdo.
Quando a sessão estiver em andamento ou concluída, você poderá visualizar o status da sessão navegando até o Monitor.
Selecione e abra o nome da atividade mais recente.
Neste caso de sessão da API Livy, você pode ver seus envios de sessões anteriores, detalhes de execução, versões do Spark e configuração. Observe o status interrompido no canto superior direito.
Para recapitular todo o processo, você precisa de um cliente remoto, como o Visual Studio Code, um token de aplicativo Microsoft Entra, URL de ponto de extremidade da API Livy, autenticação em seu Lakehouse e, finalmente, uma API Session Livy.