Упражнение. Интеграция записной книжки в конвейеры Azure Synapse

Завершено

В этом уроке вы создадите записную книжку Spark Azure Synapse для анализа и преобразования данных, загруженных потоком данных для сопоставления, и сохранения данных в озере данных. Вы создадите ячейку параметра, принимающую строковый параметр, который определяет имя папки для данных, записываемых записной книжкой в озеро данных.

Затем вы добавите эту записную книжку в конвейер Synapse и передадите уникальный идентификатор выполнения конвейера в параметр записной книжки, чтобы впоследствии можно было сопоставить выполнение конвейера с данными, сохраненными действием записной книжки.

В завершение вы с помощью концентратора мониторинга в Synapse Studio отследите выполнение конвейера, получите идентификатор выполнения, а затем найдете соответствующие файлы, хранящиеся в озере данных.

Об Apache Spark и записных книжках

Apache Spark — это платформа параллельной обработки, которая поддерживает обработку в памяти, чтобы повысить производительность приложений для анализа больших данных. Apache Spark в Azure Synapse Analytics — это одна из реализаций Apache Spark в облаке, предоставляемая корпорацией Майкрософт.

Записная книжка Apache Spark в Synapse Studio — это веб-интерфейс, позволяющий создавать файлы, содержащие реальный код, визуализации и текст описания. Записные книжки отлично подходят для проверки идей и использования быстрых экспериментов, чтобы получить аналитические сведения по данным. Они также широко используются при подготовке и визуализации данных, машинном обучении и других сценариях с большими данными.

Создание записной книжки Synapse Spark

Предположим, что вы создали поток данных для сопоставления в Synapse Analytics для обработки, объединения и импорта данных профиля пользователей. Теперь вы хотите найти первые пять продуктов для каждого пользователя, исходя из того, какие из них являются предпочтительными и популярными и какие имеют наибольшее число покупок за последние 12 месяцев. Затем вы хотите подсчитать пять лучших продуктов в сумме.

В этом упражнении вы создадите записную книжку Synapse Spark для выполнения этих вычислений.

  1. Откройте Synapse Analytics Studio (https://web.azuresynapse.net/), а затем перейдите в центр Данные.

    Выделенный элемент меню

  2. Выберите вкладку Связанные (1) и разверните основную учетную запись хранения озера данных (2) в разделе Azure Data Lake Storage 2-го поколения. Выберите контейнер wwi-02 (3) и откройте папку top-products (4). Щелкните правой кнопкой мыши любой файл Parquet (5), выберите пункт меню Создать записную книжку (6), а затем выберите Загрузить в DataFrame (7). Если папка не отображается, нажмите кнопку Refresh.

    Выделенные файл Parquet и параметр создания записной книжки.

  3. Убедитесь, что записная книжка подключена к пулу Spark.

    Выделенный элемент меню для присоединения к пулу Spark.

  4. Замените имя файла Parquet на *.parquet (1), чтобы выбрать все файлы Parquet в папке top-products. Например, путь должен иметь следующий вид: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    Выделенное имя файла.

  5. Нажмите кнопку Запустить все на панели инструментов записной книжки, чтобы выполнить операции.

    Результаты ячейки.

    Примечание.

    Synapse создает новый сеанс при первом запуске записной книжки в пуле Spark. Этот процесс занимает около 3–5 минут.

    Примечание.

    Если нужно выполнить только ячейку, наведите на нее курсор мыши и щелкните значок слева Выполнить ячейку либо выделите ячейку и нажмите клавиши CTRL+ВВОД.

  6. Создайте новую ячейку ниже, нажав кнопку + и выбрав элемент Ячейка кода. Кнопка + находится под ячейкой записной книжки слева. Кроме того, можно также развернуть меню + Ячейка на панели инструментов записной книжки и выбрать элемент Ячейка кода.

    Выделен пункт меню добавления кода.

  7. Выполните следующую команду в новой ячейке, чтобы заполнить новый кадр данных с именем topPurchases, создать новое временное представление с именем top_purchases и показать первые 100 строк:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Результат должен выглядеть следующим образом:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Выполните следующую команду в новой ячейке, чтобы создать новое временное представление с помощью SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Примечание.

    Для этого запроса ничего не выводится на экран.

    Запрос использует временное представление top_purchases в качестве источника и применяет метод row_number() over для применения номера строки к записям каждого пользователя, где ItemsPurchasedLast12Months — это наибольшее значение. Предложение where фильтрует результаты, поэтому мы получаем не более пяти продуктов, для которых поля IsTopProduct и IsPreferredProduct имеют значение true. Это дает нам пять наиболее покупаемых продуктов для каждого пользователя, причем эти продукты также определены в качестве избранных продуктов в соответствии с профилями пользователей, хранящимися в Azure Cosmos DB.

  9. Выполните следующую команду в новой ячейке, чтобы создать и отобразить новый кадр данных, в котором хранятся результаты временного представления top_5_products, созданного в предыдущей ячейке:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Должен отобразиться результат, аналогичный приведенному ниже, с первыми пятью предпочитаемыми продуктами для каждого пользователя:

    Пять наиболее предпочтительных продуктов для каждого пользователя.

  10. Вычислите сводку по первым пяти продуктам, которые являются предпочтительными для клиентов и имеют наибольшие продажи. Для этого выполните следующую команду в новой ячейке:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    В этой ячейке мы группируем первые пять наиболее предпочтительных продуктов по идентификатору продукта, суммируем итоговые продажи товаров за последние 12 месяцев, сортируем по убыванию и возвращаем пять первых результатов. Выходные данные должны быть аналогичны приведенным ниже:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Создание ячейки параметра

Конвейеры Azure Synapse ищут ячейку параметров и используют ее как значения по умолчанию для параметров, передаваемых во время выполнения. Подсистема выполнения добавит новую ячейку под ячейкой с входными параметрами, чтобы перезаписать значения по умолчанию. Если ячейка параметров не назначена, то внедряемая ячейка вставляется в начало записной книжки.

  1. Мы будем выполнять эту записную книжку из конвейера. Мы хотим передать параметр, задающий значение переменной runId, которое будет использоваться для именования файла Parquet. Выполните следующую команду в новой ячейке:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Мы используем библиотеку uuid, входящую в состав Spark, для создания случайного идентификатора GUID. Мы хотим переопределить переменную runId значением параметра, передаваемого в конвейере. Для этого нам нужно переключить это на ячейку параметра.

  2. Щелкните многоточие действия (…) в правом верхнем углу ячейки (1), а затем выберите Переключить ячейку параметра (2).

    Выделенный элемент меню.

    После переключения этого параметра в ячейке отобразится тег Parameters.

    Ячейка настроена для приема параметров.

  3. Вставьте следующий код в новую ячейку, чтобы использовать переменную runId в качестве имени файла Parquet в пути /top5-products/ в основной учетной записи озера данных. Замените YOUR_DATALAKE_NAME путь именем основной учетной записи озера данных. Чтобы найти это имя, прокрутите страницу вверх до ячейки Cell 1 вверху страницы (1). Скопируйте учетную запись хранения озера данных из пути (2). Вставьте это значение, заменив YOUR_DATALAKE_NAME в пути (3) в новой ячейке, а затем выполните команду в этой ячейке.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Изменение в пути имени основной учетной записи озера данных.

  4. Проверьте, что файл был записан в озеро данных. Перейдите в центр Данные и выберите вкладку Связанные (1). Разверните основную учетную запись хранения озера данных и выберите контейнер wwi-02 (2). Перейдите в папку top5-products (3). Должна отобразиться папка для файла Parquet в каталоге с идентификатором GUID в качестве имени файла (4).

    Выделенный файл Parquet.

    Метод записи Parquet в кадре данных в ячейке записной книжки создал этот каталог, так как он ранее не существовал.

Добавление записной книжки в конвейер Synapse

Вернемся к потоку данных для сопоставления, который мы описали в начале упражнения. Допустим, вы хотите выполнить эту записную книжку после выполнения потока данных в рамках процесса оркестрации. Для этого добавьте эту записную книжку в конвейер в качестве нового действия записной книжки.

  1. Вернитесь к записной книжке. Нажмите кнопку свойств (1) в правом верхнем углу записной книжки, а затем введите в поле Имя (2) значение Calculate Top 5 Products.

    Окно свойств.

  2. Нажмите кнопку Добавить в конвейер (1) в правом верхнем углу записной книжки, а затем выберите пункт Существующий конвейер (2).

    Выделенная кнопка добавления в конвейер.

  3. Выберите конвейер Write User Profile Data to ASA (1), а затем нажмите кнопку Добавить *(2).

    Выделен конвейер.

  4. Synapse Studio добавит действие записной книжки в конвейер. Переместите действие записной книжки так, чтобы оно находилось справа от действия потока данных. Выберите действие потока данных и перетащите зеленое поле связи конвейера успешного действия в действие записной книжки.

    Выделенная зеленая стрелка.

    Стрелка действия при успешном выполнении указывает конвейеру выполнить действие записной книжки после успешного выполнения действия потока данных.

  5. Выберите действие записной книжки (1), перейдите на вкладку Параметры (2), разверните узел Основные параметры (3)и нажмите + Создать (4). Введите runId в поле Имя (5). Выберите Строку в качестве типа (6). В поле Значение выберите Добавить динамическое содержимое (7).

    Изображение параметров.

  6. Выберите Идентификатор выполнения конвейера в разделе Системные переменные (1). В результате в поле динамического содержимого добавится @pipeline().RunId (2). Нажмите кнопку Готово (3), чтобы закрыть диалоговое окно.

    Показана форма динамического содержимого.

    Значение идентификатора выполнения конвейера — это уникальный идентификатор GUID, назначаемый каждому выполнению конвейера. Мы будем использовать это значение для имени файла Parquet, передав его в качестве параметра runId записной книжки. Затем мы можем просмотреть журнал выполнения конвейера и найти конкретный файл Parquet, создаваемый для каждого выполнения конвейера.

  7. Нажмите Опубликовать все, а затем Опубликовать, чтобы сохранить изменения.

    Выделена кнопка

  8. После завершения публикации нажмите Добавить триггер (1), а затем Активировать немедленно (2), чтобы запустить обновленный конвейер.

    Выделенный элемент меню триггера.

  9. Нажмите кнопку ОК, чтобы выполнить триггер.

    Выделена кнопка

Мониторинг конвейера

Центр Мониторинг позволяет отслеживать текущие и прошлые действия для SQL, Apache Spark и конвейеров.

  1. Перейдите на вкладку Мониторинг.

    В меню выбран центр

  2. Выберите Запуски конвейера (1) и дождитесь успешного завершения выполнения конвейера (2). Может потребоваться обновить представление (3).

    Успешное выполнение конвейера.

  3. Выберите имя конвейера для просмотра выполнения действий конвейера.

    Выбрано имя конвейера.

  4. Обратите внимание и на действие потока данных, и на новое действие записной книжки (1). Запишите значение идентификатора выполнения конвейера (2). Мы сравним его с именем файла Parquet, созданным записной книжкой. Выберите имя записной книжки Calculate Top 5 Products, чтобы просмотреть сведения о ней (3).

    Отображаются сведения о выполнении конвейера.

  5. Здесь отображаются сведения о запуске записной книжки. Можно нажать кнопку Воспроизведение (1), чтобы просмотреть ход выполнения заданий (2). В нижней части окна можно просмотреть сведения о диагностике и журналы и отфильтровать по различным параметрам (3). Справа можно просмотреть сведения о выполнении, такие как длительность, идентификатор Livy, сведения о пуле Spark и т. д. Выберите ссылку Просмотреть сведения на задании, чтобы просмотреть сведения о нем (5).

    Показаны сведения о запуске.

  6. Пользовательский интерфейс приложения Spark откроется в новой вкладке, где можно просмотреть сведения об этапе. Разверните раздел Визуализация DAG, чтобы просмотреть сведения об этапе.

    Показаны сведения об этапе Spark.

  7. Вернитесь в центр Данные.

    Экран

  8. Перейдите на вкладку Связанные (1), выберите контейнер wwi-02 (2) в основной учетной записи хранилища озера данных, перейдите в папку top5-products (3) и проверьте, что там есть папка для файла Parquet, имя которой совпадает с идентификатором выполнения конвейера.

    Выделен файл.

    Как видите, у нас есть файл, имя которого совпадает с идентификатором выполнения конвейера, который мы записали ранее.

    Выделен идентификатор выполнения конвейера.

    Эти значения совпадают, так как мы передали идентификатор выполнения конвейера в параметр runId в действии записной книжки.