Usar a API Livy para enviar e executar trabalhos de sessão
Observação
A API Livy para engenharia de dados do Fabric está em versão prévia.
Aplica-se a:✅ Engenharia e Ciência de Dados no Microsoft Fabric
Envie trabalhos em lotes do Spark usando a API Livy para Engenharia de Dados do Fabric.
Pré-requisitos
Tecido Premium ou capacidade de teste com um Lakehouse.
Um cliente remoto, como o Visual Studio Code com Jupyter Notebooks, PySpark e a Microsoft Authentication Library (MSAL) para Python.
Um token de aplicativo do Microsoft Entra é necessário para acessar a API Rest do Fabric. Registrar um aplicativo na plataforma de identidade da Microsoft.
Alguns dados em seu lakehouse, este exemplo usa NYC Taxi & Limousine Commission green_tripdata_2022_08 um arquivo de parquet carregado no lakehouse.
A API Livy define um endpoint unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos neste artigo.
Configurar o Visual Studio Code para sua sessão da API Livy
Selecione Configurações do Lakehouse no Fabric Lakehouse.
Navegue até a seção Ponto de extremidade do Livy.
Copie a cadeia de conexão do trabalho de sessão (primeira caixa vermelha na imagem) para o código.
Navegue até Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.
Criar uma sessão do Spark da API do Livy
Crie um
.ipynb
notebook no Visual Studio Code e insira o código a seguir.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador, permitindo que você escolha a identidade com a qual entrar.
Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.
Feche a janela do navegador após concluir a autenticação.
No Visual Studio Code, você deve ver o token do Microsoft Entra retornado.
Adicione outra célula do notebook e insira este código.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Execute a célula do notebook, você verá uma linha impressa à medida que a sessão do Livy é criada.
Você pode verificar se a sessão do Livy foi criada usando o [Exibir seus trabalhos no hub de monitoramento](#View seus trabalhos no hub de monitoramento).
Enviar uma instrução spark.sql usando a sessão do Spark da API Livy
Adicione outra célula do notebook e insira este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Execute a célula do notebook, você deverá ver várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.
Enviar uma segunda instrução spark.sql usando a sessão do Spark da API Livy
Adicione outra célula do notebook e insira este código.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Execute a célula do notebook, você deverá ver várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.
Feche a sessão de Tito Lívio com uma terceira declaração
Adicione outra célula do notebook e insira este código.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Exibir seus trabalhos no hub de monitoramento
Você pode acessar o hub de monitoramento para exibir várias atividades do Apache Spark selecionando Monitorar nos links de navegação do lado esquerdo.
Quando a sessão está em andamento ou no estado concluído, você pode visualizar o status da sessão navegando até Monitorar.
Selecione e abra o nome da atividade mais recente.
Neste caso de sessão da API Livy, você pode ver seus envios de sessões anteriores, detalhes de execução, versões do Spark e configuração. Observe o status parado no canto superior direito.
Para recapitular todo o processo, você precisa de um cliente remoto, como Visual Studio Code, um token de aplicativo do Microsoft Entra, URL do ponto de extremidade da API Livy, autenticação em seu Lakehouse e, finalmente, uma API do Livy de sessão.