Partilhar via


Use a API do Livy para enviar e executar trabalhos em lote do Livy

Nota

A API Livy para engenharia de dados de malha está em visualização.

Aplica-se a:✅ Engenharia de Dados e Ciência de Dados no Microsoft Fabric

Envie trabalhos em lote do Spark usando a API do Livy para engenharia de dados de malha.

Pré-requisitos

A API Livy define um ponto de extremidade unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos deste artigo.

Configurar o código do Visual Studio para seu lote de API Livy

  1. Selecione Lakehouse Settings no seu Fabric Lakehouse.

    Captura de tela mostrando as configurações do Lakehouse.

  2. Navegue até a seção Livy endpoint .

    captura de tela mostrando o ponto de extremidade Lakehouse Livy e a cadeia de conexão de trabalho Session.

  3. Copie a cadeia de conexão do trabalho em lote (segunda caixa vermelha na imagem) para o código.

  4. Navegue até o Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.

    Captura de tela mostrando a visão geral do aplicativo Livy API no centro de administração do Microsoft Entra.

Crie uma carga útil do Spark e faça o upload para a sua Lakehouse

  1. Crie um bloco de .ipynb anotações no Visual Studio Code e insira o seguinte código

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Salve o arquivo Python localmente. Esta carga útil de código Python contém duas instruções Spark que funcionam em dados em uma Lakehouse e precisam ser carregadas para sua Lakehouse. Você precisará do caminho ABFS da carga útil para referência no trabalho de lote da API Livy no Visual Studio Code e do nome da tabela Lakehouse na instrução SQL SELECT.

    Captura de tela mostrando a célula de carga útil do Python.

  3. Carregue a carga útil do Python para a seção de arquivos da sua Lakehouse. > Obter dados > Carregue ficheiros > clique na caixa Ficheiros/entrada.

    Captura de tela mostrando a carga útil na seção Arquivos da Lakehouse.

  4. Depois que o arquivo estiver na seção Arquivos da sua Lakehouse, clique nos três pontos à direita do nome do arquivo da carga útil e selecione Propriedades.

    Captura de tela mostrando o caminho ABFS da carga útil nas Propriedades do arquivo no Lakehouse.

  5. Copie este caminho ABFS para a célula do Bloco de Anotações na etapa 1.

Criar uma sessão em lote do Livy API Spark

  1. Crie um bloco de .ipynb anotações no Visual Studio Code e insira o código a seguir.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador permitindo que você escolha a identidade com a qual entrar.

    Captura de ecrã a mostrar o ecrã de início de sessão na aplicação Microsoft Entra.

  3. Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.

    Captura de tela mostrando as permissões da API do aplicativo Microsoft Entra.

  4. Feche a janela do navegador depois de concluir a autenticação.

    Captura de tela mostrando a autenticação concluída.

  5. No Visual Studio Code você deve ver o token do Microsoft Entra retornado.

    Captura de tela mostrando o token do Microsoft Entra retornado após executar a célula e fazer login.

  6. Adicione outra célula do bloco de notas e insira este código.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Execute a célula do bloco de anotações, você verá duas linhas impressas à medida que o trabalho em lote do Livy é criado.

    Captura de tela mostrando os resultados da criação da sessão em lote.

Enviar uma declaração de spark.sql usando a sessão em lote da API Livy

  1. Adicione outra célula do bloco de notas e insira este código.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Execute a célula do bloco de anotações, você verá várias linhas impressas à medida que o trabalho do Livy Batch é criado e executado.

    Captura de tela mostrando resultados no Visual Studio Code após o Livy Batch Job ter sido enviado com êxito.

  3. Navegue de volta à sua Lakehouse para ver as alterações.

Exibir seus trabalhos no hub de monitoramento

Você pode acessar o hub de monitoramento para visualizar várias atividades do Apache Spark selecionando Monitor nos links de navegação do lado esquerdo.

  1. Quando o trabalho em lote estiver concluído, você poderá exibir o status da sessão navegando até o Monitor.

    Captura de tela mostrando envios anteriores da API Livy no hub de monitoramento.

  2. Selecione e abra o nome da atividade mais recente.

    Captura de tela mostrando a atividade mais recente da API Livy no hub de monitoramento.

  3. Neste caso de sessão da API Livy, você pode ver o envio em lote anterior, os detalhes da execução, as versões do Spark e a configuração. Observe o status interrompido no canto superior direito.

    Captura de tela mostrando os detalhes mais recentes da atividade da API Livy no hub de monitoramento.

Para recapitular todo o processo, você precisa de um cliente remoto, como o Visual Studio Code, um token de aplicativo Microsoft Entra, URL de ponto de extremidade da API Livy, autenticação em sua Lakehouse, uma carga útil do Spark em sua Lakehouse e, finalmente, uma sessão de API Livy em lote.