Partilhar via


Use a API do Livy para enviar e executar trabalhos de sessão

Nota

A API Livy para engenharia de dados de malha está em visualização.

Aplica-se a:✅ Engenharia de Dados e Ciência de Dados no Microsoft Fabric

Envie trabalhos em lote do Spark usando a API do Livy para engenharia de dados de malha.

Pré-requisitos

A API Livy define um ponto de extremidade unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos deste artigo.

Configurar o código do Visual Studio para sua sessão de API Livy

  1. Selecione Lakehouse Settings no seu Fabric Lakehouse.

    Captura de tela mostrando as configurações do Lakehouse.

  2. Navegue até a seção Livy endpoint .

    captura de tela mostrando o ponto de extremidade Lakehouse Livy e a cadeia de conexão de trabalho Session.

  3. Copie a cadeia de conexão do trabalho de sessão (primeira caixa vermelha na imagem) para o código.

  4. Navegue até o Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.

    Captura de tela mostrando a visão geral do aplicativo Livy API no centro de administração do Microsoft Entra.

Criar uma sessão do Livy API Spark

  1. Crie um bloco de .ipynb anotações no Visual Studio Code e insira o código a seguir.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador permitindo que você escolha a identidade com a qual entrar.

    Captura de ecrã a mostrar o ecrã de início de sessão na aplicação Microsoft Entra.

  3. Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.

    Captura de tela mostrando as permissões da API do aplicativo Microsoft Entra.

  4. Feche a janela do navegador depois de concluir a autenticação.

    Captura de tela mostrando a autenticação concluída.

  5. No Visual Studio Code, você verá o token do Microsoft Entra retornado.

    Captura de tela mostrando o token do Microsoft Entra retornado após executar a célula e fazer login.

  6. Adicione outra célula do bloco de notas e insira este código.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Execute a célula do bloco de anotações, você verá uma linha impressa enquanto a sessão Livy é criada.

    Captura de ecrã a mostrar os resultados da primeira execução da célula do bloco de notas.

  8. Você pode verificar se a sessão Livy foi criada usando o botão [Exibir seus trabalhos no hub de Monitoramento](#View seus trabalhos no hub de Monitoramento).

Enviar uma declaração de spark.sql usando a sessão Livy API Spark

  1. Adicione outra célula do bloco de notas e insira este código.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Execute a célula do bloco de anotações, você verá várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.

    Captura de ecrã a mostrar os resultados da primeira célula do bloco de notas com execução Spark.sql.

Envie uma segunda declaração de spark.sql usando a sessão Livy API Spark

  1. Adicione outra célula do bloco de notas e insira este código.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Execute a célula do bloco de anotações, você verá várias linhas incrementais impressas à medida que o trabalho é enviado e os resultados retornados.

    Captura de ecrã a mostrar os resultados da execução da segunda célula do bloco de notas.

Encerre a sessão do Lívio com uma terceira declaração

  1. Adicione outra célula do bloco de notas e insira este código.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Exibir seus trabalhos no hub de monitoramento

Você pode acessar o hub de monitoramento para visualizar várias atividades do Apache Spark selecionando Monitor nos links de navegação do lado esquerdo.

  1. Quando a sessão estiver em andamento ou concluída, você poderá visualizar o status da sessão navegando até o Monitor.

    Captura de tela mostrando envios anteriores da API Livy no hub de monitoramento.

  2. Selecione e abra o nome da atividade mais recente.

    Captura de tela mostrando a atividade mais recente da API Livy no hub de monitoramento.

  3. Neste caso de sessão da API Livy, você pode ver seus envios de sessões anteriores, detalhes de execução, versões do Spark e configuração. Observe o status interrompido no canto superior direito.

    Captura de tela mostrando os detalhes mais recentes da atividade da API Livy no hub de monitoramento.

Para recapitular todo o processo, você precisa de um cliente remoto, como o Visual Studio Code, um token de aplicativo Microsoft Entra, URL de ponto de extremidade da API Livy, autenticação em seu Lakehouse e, finalmente, uma API Session Livy.