Use a API do Livy para enviar e executar trabalhos em lote do Livy
Nota
A API Livy para engenharia de dados de malha está em visualização.
Aplica-se a:✅ Engenharia de Dados e Ciência de Dados no Microsoft Fabric
Envie trabalhos em lote do Spark usando a API do Livy para engenharia de dados de malha.
Pré-requisitos
Um cliente remoto, como o Visual Studio Code com Jupyter Notebooks, o PySpark e a Microsoft Authentication Library (MSAL) para Python.
Um token de aplicativo Microsoft Entra é necessário para acessar a API Fabric Rest. Registre um aplicativo com a plataforma de identidade da Microsoft.
Alguns dados em sua casa do lago, este exemplo usa NYC Taxi & Limousine Commission green_tripdata_2022_08 um arquivo de parquet carregado na casa do lago.
A API Livy define um ponto de extremidade unificado para operações. Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos valores apropriados ao seguir os exemplos deste artigo.
Configurar o código do Visual Studio para seu lote de API Livy
Selecione Lakehouse Settings no seu Fabric Lakehouse.
Navegue até a seção Livy endpoint .
Copie a cadeia de conexão do trabalho em lote (segunda caixa vermelha na imagem) para o código.
Navegue até o Centro de administração do Microsoft Entra e copie a ID do aplicativo (cliente) e a ID do diretório (locatário) para o código.
Crie uma carga útil do Spark e faça o upload para a sua Lakehouse
Crie um bloco de
.ipynb
anotações no Visual Studio Code e insira o seguinte códigoimport sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("livybatchdemo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") targetLakehouse = spark_context.getConf().get("spark.targetLakehouse") if targetLakehouse is not None: print("targetLakehouse: " + str(targetLakehouse)) else: print("targetLakehouse is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4)) deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
Salve o arquivo Python localmente. Esta carga útil de código Python contém duas instruções Spark que funcionam em dados em uma Lakehouse e precisam ser carregadas para sua Lakehouse. Você precisará do caminho ABFS da carga útil para referência no trabalho de lote da API Livy no Visual Studio Code e do nome da tabela Lakehouse na instrução SQL SELECT.
Carregue a carga útil do Python para a seção de arquivos da sua Lakehouse. > Obter dados > Carregue ficheiros > clique na caixa Ficheiros/entrada.
Depois que o arquivo estiver na seção Arquivos da sua Lakehouse, clique nos três pontos à direita do nome do arquivo da carga útil e selecione Propriedades.
Copie este caminho ABFS para a célula do Bloco de Anotações na etapa 1.
Criar uma sessão em lote do Livy API Spark
Crie um bloco de
.ipynb
anotações no Visual Studio Code e insira o código a seguir.from msal import PublicClientApplication import requests import time tenant_id = "<Entra_TenantID>" client_id = "<Entra_ClientID>" workspace_id = "<Fabric_WorkspaceID>" lakehouse_id = "<Fabric_LakehouseID>" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + access_token}
Execute a célula do bloco de anotações, um pop-up deve aparecer no seu navegador permitindo que você escolha a identidade com a qual entrar.
Depois de escolher a identidade com a qual entrar, você também será solicitado a aprovar as permissões da API de registro do aplicativo Microsoft Entra.
Feche a janela do navegador depois de concluir a autenticação.
No Visual Studio Code você deve ver o token do Microsoft Entra retornado.
Adicione outra célula do bloco de notas e insira este código.
# call get batch API get_livy_get_batch = livy_base_url get_batch_response = requests.get(get_livy_get_batch, headers=headers) if get_batch_response.status_code == 200: print("API call successful") print(get_batch_response.json()) else: print(f"API call failed with status code: {get_batch_response.status_code}") print(get_batch_response.text)
Execute a célula do bloco de anotações, você verá duas linhas impressas à medida que o trabalho em lote do Livy é criado.
Enviar uma declaração de spark.sql usando a sessão em lote da API Livy
Adicione outra célula do bloco de notas e insira este código.
# submit payload to existing batch session print('Submit a spark job via the livy batch API to ') newlakehouseName = "YourNewLakehouseName" create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items" create_lakehouse_payload = { "displayName": newlakehouseName, "type": 'Lakehouse' } create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload) print(create_lakehouse_response.json()) payload_data = { "name":"livybatchdemo_with"+ newlakehouseName, "file":"abfss://YourABFSPathToYourPayload.py", "conf": { "spark.targetLakehouse": "Fabric_LakehouseID" } } get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data) print("The Livy batch job submitted successful") print(get_batch_response.json())
Execute a célula do bloco de anotações, você verá várias linhas impressas à medida que o trabalho do Livy Batch é criado e executado.
Navegue de volta à sua Lakehouse para ver as alterações.
Exibir seus trabalhos no hub de monitoramento
Você pode acessar o hub de monitoramento para visualizar várias atividades do Apache Spark selecionando Monitor nos links de navegação do lado esquerdo.
Quando o trabalho em lote estiver concluído, você poderá exibir o status da sessão navegando até o Monitor.
Selecione e abra o nome da atividade mais recente.
Neste caso de sessão da API Livy, você pode ver o envio em lote anterior, os detalhes da execução, as versões do Spark e a configuração. Observe o status interrompido no canto superior direito.
Para recapitular todo o processo, você precisa de um cliente remoto, como o Visual Studio Code, um token de aplicativo Microsoft Entra, URL de ponto de extremidade da API Livy, autenticação em sua Lakehouse, uma carga útil do Spark em sua Lakehouse e, finalmente, uma sessão de API Livy em lote.