Поделиться через


Использование API Livy для отправки и выполнения заданий сеанса

Примечание.

API Livy для Fabric Инжиниринг данных находится в предварительной версии.

Область применения:✅ Инжиниринг данных и Обработка и анализ данных в Microsoft Fabric

Отправка пакетных заданий Spark с помощью API Livy для Fabric Инжиниринг данных.

Необходимые компоненты

API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}и {Fabric_LakehouseID} соответствующими значениями при выполнении примеров в этой статье.

Настройка Visual Studio Code для сеанса API Livy

  1. Выберите "Параметры Lakehouse" в Fabric Lakehouse.

    Снимок экрана: параметры Lakehouse.

  2. Перейдите к разделу конечной точки Livy.

    Снимок экрана: конечная точка Lakehouse Livy и задание сеанса строка подключения.

  3. Скопируйте задание сеанса строка подключения (первое красное поле на изображении) в код.

  4. Перейдите в Центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.

    Снимок экрана: обзор приложения API Livy в Центре администрирования Microsoft Entra.

Создание сеанса Spark API Livy

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Запустите ячейку записной книжки, всплывающее окно должно появиться в браузере, позволяющее выбрать удостоверение для входа.

    Снимок экрана: экран входа в приложение Microsoft Entra.

  3. После выбора удостоверения для входа вам также будет предложено утвердить разрешения API регистрации приложений Microsoft Entra.

    Снимок экрана: разрешения API приложений Microsoft Entra.

  4. Закройте окно браузера после завершения проверки подлинности.

    Снимок экрана: проверка подлинности завершена.

  5. В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.

    Снимок экрана: маркер Microsoft Entra, возвращенный после выполнения ячейки и входа в систему.

  6. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Запустите ячейку записной книжки, вы увидите одну строку, напечатанную при создании сеанса Livy.

    Снимок экрана: результаты первого выполнения ячейки записной книжки.

  8. Вы можете убедиться, что сеанс Livy создается с помощью [просмотр заданий в центре мониторинга](#View задания в центре мониторинга).

Отправка инструкции spark.sql с помощью сеанса Spark API Livy

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.

    Снимок экрана: результаты первой ячейки записной книжки с Spark.sql выполнения.

Отправка второй инструкции spark.sql с помощью сеанса Spark API Livy

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.

    Снимок экрана: результаты выполнения второй ячейки записной книжки.

Закройте сеанс Livy с третьей инструкцией

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Просмотр заданий в центре мониторинга

Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.

  1. Когда сеанс выполняется или находится в состоянии завершения, можно просмотреть состояние сеанса, перейдя к монитору.

    Снимок экрана: предыдущие отправки API Livy в центре мониторинга.

  2. Выберите и откройте последнее имя действия.

    Снимок экрана: последнее действие API Livy в центре мониторинга.

  3. В этом случае сеанса API Livy можно просмотреть предыдущие отправки сеансов, сведения о выполнении, версиях Spark и конфигурации. Обратите внимание на остановленное состояние в правом верхнем углу.

    снимок экрана: последние сведения о действиях API Livy в центре мониторинга.

Чтобы восстановить весь процесс, вам потребуется удаленный клиент, например Visual Studio Code, маркер приложения Microsoft Entra, URL-адрес конечной точки API Livy, проверка подлинности в Lakehouse и, наконец, API Сеанса Livy.