Использование API Livy для отправки и выполнения заданий сеанса
Примечание.
API Livy для Fabric Инжиниринг данных находится в предварительной версии.
Область применения:✅ Инжиниринг данных и Обработка и анализ данных в Microsoft Fabric
Отправка пакетных заданий Spark с помощью API Livy для Fabric Инжиниринг данных.
Необходимые компоненты
Емкость Fabric Premium или пробная версия с помощью Lakehouse.
Удаленный клиент, например Visual Studio Code с Jupyter Notebook, PySpark и библиотекой проверки подлинности Майкрософт (MSAL) для Python.
Для доступа к REST API Fabric требуется маркер приложения Microsoft Entra. Регистрация приложения с помощью платформы удостоверений Майкрософт.
Некоторые данные в озерном доме, в этом примере используются NYC Taxi и Лимузин комиссии green_tripdata_2022_08 файл parquet, загруженный в lakehouse.
API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}и {Fabric_LakehouseID} соответствующими значениями при выполнении примеров в этой статье.
Настройка Visual Studio Code для сеанса API Livy
Выберите "Параметры Lakehouse" в Fabric Lakehouse.
Перейдите к разделу конечной точки Livy.
Скопируйте задание сеанса строка подключения (первое красное поле на изображении) в код.
Перейдите в Центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.
Создание сеанса Spark API Livy
Создайте записную книжку
.ipynb
в Visual Studio Code и вставьте следующий код.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Запустите ячейку записной книжки, всплывающее окно должно появиться в браузере, позволяющее выбрать удостоверение для входа.
После выбора удостоверения для входа вам также будет предложено утвердить разрешения API регистрации приложений Microsoft Entra.
Закройте окно браузера после завершения проверки подлинности.
В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.
Добавьте еще одну ячейку записной книжки и вставьте этот код.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Запустите ячейку записной книжки, вы увидите одну строку, напечатанную при создании сеанса Livy.
Вы можете убедиться, что сеанс Livy создается с помощью [просмотр заданий в центре мониторинга](#View задания в центре мониторинга).
Отправка инструкции spark.sql с помощью сеанса Spark API Livy
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.
Отправка второй инструкции spark.sql с помощью сеанса Spark API Livy
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.
Закройте сеанс Livy с третьей инструкцией
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Просмотр заданий в центре мониторинга
Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.
Когда сеанс выполняется или находится в состоянии завершения, можно просмотреть состояние сеанса, перейдя к монитору.
Выберите и откройте последнее имя действия.
В этом случае сеанса API Livy можно просмотреть предыдущие отправки сеансов, сведения о выполнении, версиях Spark и конфигурации. Обратите внимание на остановленное состояние в правом верхнем углу.
Чтобы восстановить весь процесс, вам потребуется удаленный клиент, например Visual Studio Code, маркер приложения Microsoft Entra, URL-адрес конечной точки API Livy, проверка подлинности в Lakehouse и, наконец, API Сеанса Livy.