Поделиться через


Отправка и выполнение пакетных заданий Livy с помощью API Livy

Примечание.

API Livy для Fabric Инжиниринг данных находится в предварительной версии.

Область применения:✅ Инжиниринг данных и Обработка и анализ данных в Microsoft Fabric

Отправка пакетных заданий Spark с помощью API Livy для Fabric Инжиниринг данных.

Необходимые компоненты

API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}и {Fabric_LakehouseID} соответствующими значениями при выполнении примеров в этой статье.

Настройка Visual Studio Code для пакетной службы API Livy

  1. Выберите "Параметры Lakehouse" в Fabric Lakehouse.

    Снимок экрана: параметры Lakehouse.

  2. Перейдите к разделу конечной точки Livy.

    Снимок экрана: конечная точка Lakehouse Livy и задание сеанса строка подключения.

  3. Скопируйте задание пакетной службы строка подключения (второе красное поле на изображении) в код.

  4. Перейдите в Центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.

    Снимок экрана: обзор приложения API Livy в Центре администрирования Microsoft Entra.

Создание полезных данных Spark и отправка в Lakehouse

  1. Создание записной книжки .ipynb в Visual Studio Code и вставка следующего кода

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Сохраните файл Python локально. Эта полезные данные кода Python содержат два оператора Spark, которые работают с данными в Lakehouse и должны быть отправлены в Lakehouse. Вам потребуется путь полезных данных ABFS, чтобы использовать в пакетном задании API Livy в Visual Studio Code, и имя таблицы Lakehouse в операторе SQL SELECT.

    Снимок экрана: ячейка полезных данных Python.

  3. Отправьте полезные данные Python в раздел файлов в Lakehouse. >Получите файлы > отправки данных > в поле "Файлы/ входные данные".

    Снимок экрана: полезные данные в разделе

  4. После того как файл находится в разделе "Файлы" в Lakehouse, щелкните три точки справа от имени файла полезных данных и выберите "Свойства".

    Снимок экрана: путь ABFS полезных данных в свойствах файла в Lakehouse.

  5. Скопируйте этот путь ABFS в ячейку записной книжки на шаге 1.

Создание пакетного сеанса API Livy Spark

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Запустите ячейку записной книжки, всплывающее окно должно появиться в браузере, позволяющее выбрать удостоверение для входа.

    Снимок экрана: экран входа в приложение Microsoft Entra.

  3. После выбора удостоверения для входа вам также будет предложено утвердить разрешения API регистрации приложений Microsoft Entra.

    Снимок экрана: разрешения API приложений Microsoft Entra.

  4. Закройте окно браузера после завершения проверки подлинности.

    Снимок экрана: проверка подлинности завершена.

  5. В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.

    Снимок экрана: маркер Microsoft Entra, возвращенный после выполнения ячейки и входа в систему.

  6. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Запустите ячейку записной книжки, вы увидите две строки, напечатанные при создании пакетного задания Livy.

    Снимок экрана: результаты создания пакетного сеанса.

Отправка инструкции spark.sql с помощью пакетного сеанса API Livy

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Запустите ячейку записной книжки, вы увидите несколько строк, которые печатаются при создании и запуске задания пакетной службы Livy.

    Снимок экрана: результаты в Visual Studio Code после успешной отправки пакетного задания Livy.

  3. Вернитесь к Lakehouse, чтобы увидеть изменения.

Просмотр заданий в центре мониторинга

Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.

  1. После завершения пакетного задания можно просмотреть состояние сеанса, перейдя к монитору.

    Снимок экрана: предыдущие отправки API Livy в центре мониторинга.

  2. Выберите и откройте последнее имя действия.

    Снимок экрана: последнее действие API Livy в центре мониторинга.

  3. В этом случае сеанса API Livy можно просмотреть предыдущую пакетную отправку, сведения о выполнении, версии Spark и конфигурацию. Обратите внимание на остановленное состояние в правом верхнем углу.

    снимок экрана: последние сведения о действиях API Livy в центре мониторинга.

Чтобы восстановить весь процесс, вам нужен удаленный клиент, например Visual Studio Code, маркер приложения Microsoft Entra, URL-адрес конечной точки API Livy, проверка подлинности в Lakehouse, полезные данные Spark в Lakehouse и, наконец, сеанс API пакетной службы Livy.