Compartilhar via


Usar a API Livy para enviar e executar trabalhos de sessão

Observação

A API Livy para engenharia de dados do Fabric está em versão prévia.

Aplica-se a:✅ Engenharia e Ciência de Dados no Microsoft Fabric

Envie trabalhos em lotes do Spark usando a API Livy para Engenharia de Dados do Fabric.

Pré-requisitos

A API Livy define um endpoint unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos neste artigo.

Configurar o Visual Studio Code para sua sessão da API Livy

  1. Selecione Configurações do Lakehouse no Fabric Lakehouse.

    Captura de tela mostrando as configurações do Lakehouse.

  2. Navegue até a seção Ponto de extremidade do Livy.

    captura de tela mostrando o ponto de extremidade do Lakehouse Livy e a cadeia de conexão do trabalho de sessão.

  3. Copie a cadeia de conexão do trabalho de sessão (primeira caixa vermelha na imagem) para o código.

  4. Navegue até Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.

    Captura de tela mostrando a visão geral do aplicativo Livy API no centro de administração do Microsoft Entra.

Criar uma sessão do Spark da API do Livy

  1. Crie um .ipynb notebook no Visual Studio Code e insira o código a seguir.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador, permitindo que você escolha a identidade com a qual entrar.

    Captura de tela mostrando a tela de logon no aplicativo Microsoft Entra.

  3. Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.

    Captura de tela mostrando as permissões da API do aplicativo Microsoft Entra.

  4. Feche a janela do navegador após concluir a autenticação.

    Captura de tela mostrando a autenticação concluída.

  5. No Visual Studio Code, você deve ver o token do Microsoft Entra retornado.

    Captura de tela mostrando o token do Microsoft Entra retornado após a execução da célula e o login.

  6. Adicione outra célula do notebook e insira este código.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Execute a célula do notebook, você verá uma linha impressa à medida que a sessão do Livy é criada.

    Captura de tela mostrando os resultados da primeira execução da célula do notebook.

  8. Você pode verificar se a sessão do Livy foi criada usando o [Exibir seus trabalhos no hub de monitoramento](#View seus trabalhos no hub de monitoramento).

Enviar uma instrução spark.sql usando a sessão do Spark da API Livy

  1. Adicione outra célula do notebook e insira este código.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Execute a célula do notebook, você deverá ver várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.

    Captura de tela mostrando os resultados da primeira célula do notebook com Spark.sql execução.

Enviar uma segunda instrução spark.sql usando a sessão do Spark da API Livy

  1. Adicione outra célula do notebook e insira este código.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Execute a célula do notebook, você deverá ver várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.

    Captura de tela mostrando os resultados da segunda execução da célula do notebook.

Feche a sessão de Tito Lívio com uma terceira declaração

  1. Adicione outra célula do notebook e insira este código.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Exibir seus trabalhos no hub de monitoramento

Você pode acessar o hub de monitoramento para exibir várias atividades do Apache Spark selecionando Monitorar nos links de navegação do lado esquerdo.

  1. Quando a sessão está em andamento ou no estado concluído, você pode visualizar o status da sessão navegando até Monitorar.

    Captura de tela mostrando envios anteriores da API Livy no hub de monitoramento.

  2. Selecione e abra o nome da atividade mais recente.

    Captura de tela mostrando a atividade mais recente da API Livy no hub de monitoramento.

  3. Neste caso de sessão da API Livy, você pode ver seus envios de sessões anteriores, detalhes de execução, versões do Spark e configuração. Observe o status parado no canto superior direito.

    Captura de tela mostrando os detalhes mais recentes da atividade da API Livy no Hub de Monitoramento.

Para recapitular todo o processo, você precisa de um cliente remoto, como Visual Studio Code, um token de aplicativo do Microsoft Entra, URL do ponto de extremidade da API Livy, autenticação em seu Lakehouse e, finalmente, uma API do Livy de sessão.