Учебник. Реализация шаблона сохранения озера данных для обновления таблицы Databricks Delta
В этом руководстве описана обработка событий в учетной записи хранения с иерархическим пространством имен.
Вы создадите небольшое решение, которое позволяет пользователю заполнить таблицу Databricks Delta, загружая файл разделенных запятыми значений (CSV) с описанием заказа на продажу. Чтобы создать это решение, вы объедините подписку на Сетку событий, функцию Azure и задание в Azure Databricks.
При работе с этим руководством вы сделаете следующее:
- создавать событие в подписке на Сетку событий, которое вызывает функцию Azure;
- создавать функцию Azure, которая получает от события уведомление и запускает задание в Azure Databricks;
- создавать задание Databricks, в рамках которого заказ клиента вставляется в таблицу Databricks Delta в учетной записи хранения.
Мы будем создавать решение в обратном порядке, то есть начнем с рабочей области Azure Databricks.
Необходимые компоненты
Создайте учетную запись хранения с иерархическим пространством имен (Azure Data Lake Storage). В рамках этого руководства используется учетная запись хранения с именем
contosoorders
.См. статью "Создание учетной записи хранения для использования с Azure Data Lake Storage".
Убедитесь, что учетная запись пользователя содержит назначенную ей роль участника для данных хранилища BLOB-объектов.
Создайте субъект-службу, создайте секрет клиента и предоставьте субъекту-службе доступ к учетной записи хранения.
См . руководство по подключению к Azure Data Lake Storage (шаги 1–3). После выполнения этих действий обязательно вставьте идентификатор клиента, идентификатор приложения и значения секрета клиента в текстовый файл. Они вам скоро понадобятся.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Создать заказ на продажу
Сначала создайте CSV-файл с описанием заказа на продажу и отправьте его в учетную запись хранения. Позже вы примените данные из этого файла для заполнения первой строки в таблице Databricks Delta.
Войдите в новую учетную запись хранения на портале Azure.
Выберите контейнер "Контейнеры BLOB-объектов> хранилища"> и создайте контейнер с именем контейнера.
В контейнере данных создайте каталог с именем входных данных.
Вставьте приведенный ниже текст в текстовый редактор.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Сохраните этот файл на локальном компьютере и присвойте ему имя data.csv.
В браузере хранилища отправьте этот файл в входную папку.
Создание задания в Azure Databricks
В этом разделе описано выполнение таких задач:
- Создайте рабочую область Azure Databricks.
- Создайте записную книжку.
- создание и заполнение таблицы Databricks Delta;
- добавление кода для вставки строк в таблицу Databricks Delta;
- создание задания.
Создайте рабочую область Azure Databricks.
В этом разделе вы создадите рабочую область Azure Databricks с помощью портала Azure.
Создайте рабочую область Azure Databricks. Присвойте имя рабочей области
contoso-orders
. См. статью "Создание рабочей области Azure Databricks".Создание кластера. Присвойте кластеру
customer-order-cluster
имя. См. Создание кластера.Создайте записную книжку. Назовите записную книжку
configure-customer-table
и выберите Python в качестве языка записной книжки по умолчанию. См. статью "Создание записной книжки".
Создание и заполнение таблицы Databricks Delta
Скопируйте приведенный ниже блок кода и вставьте его в первую ячейку в новой записной книжке, но пока не выполняйте этот код.
В этом блоке кода замените значения заполнителей
appId
,password
иtenant
значениями, которые вы собрали при подготовке предварительных условий для этого руководства.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Этот код позволяет создать мини-приложение с именем source_file. Позже вы создадите функцию Azure, которая вызывает этот код и передает мини-приложению путь к файлу. Этот код также позволяет выполнить аутентификацию субъекта-службы в учетной записи хранения, а затем создать переменные для использования в других ячейках.
Примечание.
При настройке рабочей среды рассмотрите возможность сохранения ключа проверки подлинности в Azure Databricks. Затем в блоке кода замените ключ проверки подлинности ключом поиска.
Например, вместо использования строки кодаspark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
следует использовать строкуspark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
После завершения работы с этим руководством ознакомьтесь со статьей Azure Data Lake Storage на веб-сайте Azure Databricks, чтобы просмотреть примеры этого подхода.Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.
Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку записной книжки. Затем нажмите сочетание клавиш SHIFT+ВВОД, чтобы выполнить этот блок кода.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Этот код позволяет создать в учетной записи хранения таблицу Databricks Delta, а затем загрузить в нее начальные данные из ранее отправленного CSV-файла.
После успешного выполнения этого блока кода удалите его из записной книжки.
Добавление кода для вставки строк в таблицу Databricks Delta
Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку, но пока не запускайте ее.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Этот код позволяет вставлять во временное табличное представление данные из CSV-файла. Путь к этому CSV-файлу получен из входных данных мини-приложения, которое вы создали на предыдущем шаге.
Скопируйте и вставьте следующий блок кода в другую ячейку. Этот код объединяет содержимое временной таблицы с таблицей Databricks Delta.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
Создание задания
Создайте задание для запуска созданной ранее записной книжки. Позже вы создадите функцию Azure, которая запускает это задание при возникновении события.
Выберите "Создать> задание".
Присвойте заданию имя, выберите созданную записную книжку и кластер. Затем нажмите кнопку "Создать ", чтобы создать задание.
Создание функции Azure
Создайте функцию Azure, которая запускает это задание.
В рабочей области Azure Databricks щелкните имя пользователя Azure Databricks в верхней строке, а затем в раскрывающемся списке выберите "Параметры пользователя".
На вкладке "Маркеры доступа" выберите "Создать новый маркер".
Скопируйте отображаемый маркер и нажмите кнопку "Готово".
В верхнем углу рабочей области Databricks Delta щелкните значок "Люди", а затем щелкните Параметры пользователя.
Нажмите кнопку "Создать новый маркер " и нажмите кнопку "Создать ".
Не забудьте скопировать значение токена в надежное расположение. Этот токен потребуется функции Azure для аутентификации в Databricks, чтобы запустить задание.
На домашней странице или в меню портала Azure выберите Создать ресурс.
На странице Создать щелкните Вычислительные ресурсы>Приложение-функция.
На вкладке "Основы" на странице "Создание приложения-функции" выберите группу ресурсов, а затем измените или проверьте следующие параметры:
Параметр Значение Имя приложения-функции contosoorder Стек среды выполнения .NET Публикация Код Операционная система Windows Тип плана Потребление (бессерверный) Выберите Проверить и создать, а затем выберите Создать.
По завершении развертывания выберите "Перейти к ресурсу ", чтобы открыть страницу обзора приложения-функции.
В группе Параметры выберите Конфигурация.
На странице Параметры приложения нажмите кнопку Новый параметр приложения поочередно для каждого параметра.
Добавьте следующие параметры:
Имя настройки Значение DBX_INSTANCE Регион для рабочей области Databricks. Например: westus2.azuredatabricks.net
DBX_PAT Личный маркер доступа, который вы создали ранее. DBX_JOB_ID Уникальный идентификатор выполняемого задания. Нажмите кнопку "Сохранить", чтобы зафиксировать эти параметры.
В группе "Функции" выберите "Функции" и нажмите кнопку "Создать".
Щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure).
Установите расширение Microsoft.Azure.WebJobs.Extensions.EventGrid, если появится такое предложение. Если потребуется установить расширение, повторно щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure), чтобы создать функцию.
Появится область Новая функция.
В области "Создать функцию" назовите функцию UpsertOrder и нажмите кнопку "Создать".
Замените содержимое файла кода этим кодом и нажмите кнопку "Сохранить ":
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Этот код позволяет проанализировать сведения о возникшем событии хранилища и создать сообщение запроса с URL-адресом файла, вызвавшего событие. В составе этого сообщения функция передает значение в мини-приложение source_file, которое вы создали ранее. Код функции отправляет сообщение в задание Databricks и использует маркер, полученный ранее в качестве проверки подлинности.
Создание подписки Сетки событий
В этом разделе вы создадите подписку на службу "Сетка событий", которая вызывает функцию Azure при отправке файлов в учетную запись хранения.
Выберите "Интеграция", а затем на странице "Интеграция" выберите "Триггер сетки событий".
В области "Изменить триггер" назовите событие, а затем выберите "Создать подписку на события
eventGridEvent
".Примечание.
Имя
eventGridEvent
соответствует параметру, который передается в функцию Azure.На вкладке "Основы" на странице "Создание подписки на события" измените или проверьте следующие параметры:
Параметр Значение Имя. contoso-order-event-subscription Тип темы Storage account Исходный ресурс contosoorders Имя раздела системы <create any name>
Фильтр по типам событий Создание BLOB-объектов и удаление BLOB-объектов Выберите кнопку Создать.
Тестирование подписки на Сетку событий
Создайте файл с именем
customer-order.csv
, вставьте в него приведенный ниже код JSON и сохраните файл на локальном компьютере.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.
При отправке файла вызывается событие Microsoft.Storage.BlobCreated. Сетка событий уведомляет всех, кто подписался на это событие. В нашем примере единственным подписчиком является наша функция Azure. Эта функция Azure анализирует параметры события и определяет, какое событие произошло. Затем она передает URL-адрес файла в задание Databricks. В рамках задания Databricks файл считывается и в таблицу Databricks Delta, размещенную в учетной записи хранения, добавляется соответствующая строка.
Чтобы проверить успешность задания, просмотрите запуски для задания. Вы увидите состояние завершения. Дополнительные сведения о просмотре запусков для задания см. в разделе "Просмотр запусков для задания"
В новой ячейке книги выполните приведенный ниже запрос, чтобы просмотреть обновленную таблицу Databricks Delta.
%sql select * from customer_data
Возвращается таблица, которая содержит последнюю запись.
Чтобы обновить эту запись, создайте файл с именем
customer-order-update.csv
, вставьте в него приведенный ниже код и сохраните файл на локальном компьютере.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Этот CSV-файл почти идентичен предыдущему, за исключением того, что количество в заказе изменилось с
228
на22
.С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.
Снова выполните запрос
select
, чтобы просмотреть обновленную разностную таблицу.%sql select * from customer_data
Будет возвращена таблица, которая содержит обновленную запись.
Очистка ресурсов
Удалите группу ресурсов и все связанные с ней ресурсы, когда надобность в них отпадет. Для этого выберите группу ресурсов для учетной записи хранения и выберите Удалить.
Следующие шаги
Reacting to Blob storage events (preview) (Реагирование на события хранилища BLOB-объектов)