Упражнение. Интеграция записной книжки в конвейеры Azure Synapse
В этом уроке вы создадите записную книжку Spark Azure Synapse для анализа и преобразования данных, загруженных потоком данных для сопоставления, и сохранения данных в озере данных. Вы создадите ячейку параметра, принимающую строковый параметр, который определяет имя папки для данных, записываемых записной книжкой в озеро данных.
Затем вы добавите эту записную книжку в конвейер Synapse и передадите уникальный идентификатор выполнения конвейера в параметр записной книжки, чтобы впоследствии можно было сопоставить выполнение конвейера с данными, сохраненными действием записной книжки.
В завершение вы с помощью концентратора мониторинга в Synapse Studio отследите выполнение конвейера, получите идентификатор выполнения, а затем найдете соответствующие файлы, хранящиеся в озере данных.
Об Apache Spark и записных книжках
Apache Spark — это платформа параллельной обработки, которая поддерживает обработку в памяти, чтобы повысить производительность приложений для анализа больших данных. Apache Spark в Azure Synapse Analytics — это одна из реализаций Apache Spark в облаке, предоставляемая корпорацией Майкрософт.
Записная книжка Apache Spark в Synapse Studio — это веб-интерфейс, позволяющий создавать файлы, содержащие реальный код, визуализации и текст описания. Записные книжки отлично подходят для проверки идей и использования быстрых экспериментов, чтобы получить аналитические сведения по данным. Они также широко используются при подготовке и визуализации данных, машинном обучении и других сценариях с большими данными.
Создание записной книжки Synapse Spark
Предположим, что вы создали поток данных для сопоставления в Synapse Analytics для обработки, объединения и импорта данных профиля пользователей. Теперь вы хотите найти первые пять продуктов для каждого пользователя, исходя из того, какие из них являются предпочтительными и популярными и какие имеют наибольшее число покупок за последние 12 месяцев. Затем вы хотите подсчитать пять лучших продуктов в сумме.
В этом упражнении вы создадите записную книжку Synapse Spark для выполнения этих вычислений.
Откройте Synapse Analytics Studio (https://web.azuresynapse.net/), а затем перейдите в центр Данные.
Выберите вкладку Связанные (1) и разверните основную учетную запись хранения озера данных (2) в разделе Azure Data Lake Storage 2-го поколения. Выберите контейнер wwi-02 (3) и откройте папку top-products (4). Щелкните правой кнопкой мыши любой файл Parquet (5), выберите пункт меню Создать записную книжку (6), а затем выберите Загрузить в DataFrame (7). Если папка не отображается, нажмите кнопку
Refresh
.Убедитесь, что записная книжка подключена к пулу Spark.
Замените имя файла Parquet на
*.parquet
(1), чтобы выбрать все файлы Parquet в папкеtop-products
. Например, путь должен иметь следующий вид:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Нажмите кнопку Запустить все на панели инструментов записной книжки, чтобы выполнить операции.
Примечание.
Synapse создает новый сеанс при первом запуске записной книжки в пуле Spark. Этот процесс занимает около 3–5 минут.
Примечание.
Если нужно выполнить только ячейку, наведите на нее курсор мыши и щелкните значок слева Выполнить ячейку либо выделите ячейку и нажмите клавиши CTRL+ВВОД.
Создайте новую ячейку ниже, нажав кнопку + и выбрав элемент Ячейка кода. Кнопка + находится под ячейкой записной книжки слева. Кроме того, можно также развернуть меню + Ячейка на панели инструментов записной книжки и выбрать элемент Ячейка кода.
Выполните следующую команду в новой ячейке, чтобы заполнить новый кадр данных с именем
topPurchases
, создать новое временное представление с именемtop_purchases
и показать первые 100 строк:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Результат должен выглядеть следующим образом:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Выполните следующую команду в новой ячейке, чтобы создать новое временное представление с помощью SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Примечание.
Для этого запроса ничего не выводится на экран.
Запрос использует временное представление
top_purchases
в качестве источника и применяет методrow_number() over
для применения номера строки к записям каждого пользователя, гдеItemsPurchasedLast12Months
— это наибольшее значение. Предложениеwhere
фильтрует результаты, поэтому мы получаем не более пяти продуктов, для которых поляIsTopProduct
иIsPreferredProduct
имеют значение true. Это дает нам пять наиболее покупаемых продуктов для каждого пользователя, причем эти продукты также определены в качестве избранных продуктов в соответствии с профилями пользователей, хранящимися в Azure Cosmos DB.Выполните следующую команду в новой ячейке, чтобы создать и отобразить новый кадр данных, в котором хранятся результаты временного представления
top_5_products
, созданного в предыдущей ячейке:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Должен отобразиться результат, аналогичный приведенному ниже, с первыми пятью предпочитаемыми продуктами для каждого пользователя:
Вычислите сводку по первым пяти продуктам, которые являются предпочтительными для клиентов и имеют наибольшие продажи. Для этого выполните следующую команду в новой ячейке:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
В этой ячейке мы группируем первые пять наиболее предпочтительных продуктов по идентификатору продукта, суммируем итоговые продажи товаров за последние 12 месяцев, сортируем по убыванию и возвращаем пять первых результатов. Выходные данные должны быть аналогичны приведенным ниже:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Создание ячейки параметра
Конвейеры Azure Synapse ищут ячейку параметров и используют ее как значения по умолчанию для параметров, передаваемых во время выполнения. Подсистема выполнения добавит новую ячейку под ячейкой с входными параметрами, чтобы перезаписать значения по умолчанию. Если ячейка параметров не назначена, то внедряемая ячейка вставляется в начало записной книжки.
Мы будем выполнять эту записную книжку из конвейера. Мы хотим передать параметр, задающий значение переменной
runId
, которое будет использоваться для именования файла Parquet. Выполните следующую команду в новой ячейке:import uuid # Generate random GUID runId = uuid.uuid4()
Мы используем библиотеку
uuid
, входящую в состав Spark, для создания случайного идентификатора GUID. Мы хотим переопределить переменнуюrunId
значением параметра, передаваемого в конвейере. Для этого нам нужно переключить это на ячейку параметра.Щелкните многоточие действия (…) в правом верхнем углу ячейки (1), а затем выберите Переключить ячейку параметра (2).
После переключения этого параметра в ячейке отобразится тег Parameters.
Вставьте следующий код в новую ячейку, чтобы использовать переменную
runId
в качестве имени файла Parquet в пути/top5-products/
в основной учетной записи озера данных. ЗаменитеYOUR_DATALAKE_NAME
путь именем основной учетной записи озера данных. Чтобы найти это имя, прокрутите страницу вверх до ячейки Cell 1 вверху страницы (1). Скопируйте учетную запись хранения озера данных из пути (2). Вставьте это значение, заменивYOUR_DATALAKE_NAME
в пути (3) в новой ячейке, а затем выполните команду в этой ячейке.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Проверьте, что файл был записан в озеро данных. Перейдите в центр Данные и выберите вкладку Связанные (1). Разверните основную учетную запись хранения озера данных и выберите контейнер wwi-02 (2). Перейдите в папку top5-products (3). Должна отобразиться папка для файла Parquet в каталоге с идентификатором GUID в качестве имени файла (4).
Метод записи Parquet в кадре данных в ячейке записной книжки создал этот каталог, так как он ранее не существовал.
Добавление записной книжки в конвейер Synapse
Вернемся к потоку данных для сопоставления, который мы описали в начале упражнения. Допустим, вы хотите выполнить эту записную книжку после выполнения потока данных в рамках процесса оркестрации. Для этого добавьте эту записную книжку в конвейер в качестве нового действия записной книжки.
Вернитесь к записной книжке. Нажмите кнопку свойств (1) в правом верхнем углу записной книжки, а затем введите в поле Имя (2) значение
Calculate Top 5 Products
.Нажмите кнопку Добавить в конвейер (1) в правом верхнем углу записной книжки, а затем выберите пункт Существующий конвейер (2).
Выберите конвейер Write User Profile Data to ASA (1), а затем нажмите кнопку Добавить *(2).
Synapse Studio добавит действие записной книжки в конвейер. Переместите действие записной книжки так, чтобы оно находилось справа от действия потока данных. Выберите действие потока данных и перетащите зеленое поле связи конвейера успешного действия в действие записной книжки.
Стрелка действия при успешном выполнении указывает конвейеру выполнить действие записной книжки после успешного выполнения действия потока данных.
Выберите действие записной книжки (1), перейдите на вкладку Параметры (2), разверните узел Основные параметры (3)и нажмите + Создать (4). Введите
runId
в поле Имя (5). Выберите Строку в качестве типа (6). В поле Значение выберите Добавить динамическое содержимое (7).Выберите Идентификатор выполнения конвейера в разделе Системные переменные (1). В результате в поле динамического содержимого добавится
@pipeline().RunId
(2). Нажмите кнопку Готово (3), чтобы закрыть диалоговое окно.Значение идентификатора выполнения конвейера — это уникальный идентификатор GUID, назначаемый каждому выполнению конвейера. Мы будем использовать это значение для имени файла Parquet, передав его в качестве параметра
runId
записной книжки. Затем мы можем просмотреть журнал выполнения конвейера и найти конкретный файл Parquet, создаваемый для каждого выполнения конвейера.Нажмите Опубликовать все, а затем Опубликовать, чтобы сохранить изменения.
После завершения публикации нажмите Добавить триггер (1), а затем Активировать немедленно (2), чтобы запустить обновленный конвейер.
Нажмите кнопку ОК, чтобы выполнить триггер.
Мониторинг конвейера
Центр Мониторинг позволяет отслеживать текущие и прошлые действия для SQL, Apache Spark и конвейеров.
Перейдите на вкладку Мониторинг.
Выберите Запуски конвейера (1) и дождитесь успешного завершения выполнения конвейера (2). Может потребоваться обновить представление (3).
Выберите имя конвейера для просмотра выполнения действий конвейера.
Обратите внимание и на действие потока данных, и на новое действие записной книжки (1). Запишите значение идентификатора выполнения конвейера (2). Мы сравним его с именем файла Parquet, созданным записной книжкой. Выберите имя записной книжки Calculate Top 5 Products, чтобы просмотреть сведения о ней (3).
Здесь отображаются сведения о запуске записной книжки. Можно нажать кнопку Воспроизведение (1), чтобы просмотреть ход выполнения заданий (2). В нижней части окна можно просмотреть сведения о диагностике и журналы и отфильтровать по различным параметрам (3). Справа можно просмотреть сведения о выполнении, такие как длительность, идентификатор Livy, сведения о пуле Spark и т. д. Выберите ссылку Просмотреть сведения на задании, чтобы просмотреть сведения о нем (5).
Пользовательский интерфейс приложения Spark откроется в новой вкладке, где можно просмотреть сведения об этапе. Разверните раздел Визуализация DAG, чтобы просмотреть сведения об этапе.
Вернитесь в центр Данные.
Перейдите на вкладку Связанные (1), выберите контейнер wwi-02 (2) в основной учетной записи хранилища озера данных, перейдите в папку top5-products (3) и проверьте, что там есть папка для файла Parquet, имя которой совпадает с идентификатором выполнения конвейера.
Как видите, у нас есть файл, имя которого совпадает с идентификатором выполнения конвейера, который мы записали ранее.
Эти значения совпадают, так как мы передали идентификатор выполнения конвейера в параметр
runId
в действии записной книжки.