Копирование данных из Azure Cosmos DB в выделенный пул SQL с помощью Apache Spark
С помощью Azure Synapse Link для Azure Cosmos DB пользователи могут запускать аналитику почти в реальном времени по операционным данным в Azure Cosmos DB. Однако в некоторых ситуациях определенные данные необходимо агрегировать и обогащать для обслуживания пользователей хранилища данных. Для курирования и экспорта данных Azure Synapse Link достаточно всего нескольких ячеек в записной книжке.
Необходимые компоненты
- Подготовьте рабочую область Synapse с помощью:
- Подготовка учетной записи Azure Cosmos DB с контейнером HTAP с данными
- Подключите контейнер HTAP Azure Cosmos DB к рабочей области
- Настройте надлежащим образом импорт данных в выделенный пул SQL из Spark.
Шаги
В этом руководстве показано, как подключиться к аналитическому хранилищу. Описанные здесь действия не влияют на хранилище транзакций (для их выполнения единицы запросов не потребляются). Мы выполним следующие действия:
- Чтение контейнера HTAP Azure Cosmos DB в кадр данных Spark
- Агрегирование результатов в новый кадр данных
- Прием данных в выделенный пул SQL
Data
В этом примере мы используем контейнер HTAP с именем RetailSales. Это часть связанной службы с именем ConnectedData со следующей схемой:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
Для отчетности мы будем агрегировать продажи (quantity, revenue (price x quantity) по productCode и weekStarting. Наконец, мы экспортируем эти данные в выделенную таблицу dbo.productsales
пула SQL.
Настройка записной книжки Spark
Создайте записную книжку Spark, используя в качестве основного языка с Scala на Spark (Scala). Мы используем для сеанса заданный по умолчанию параметр записной книжки.
Чтение данных в Spark
Чтение контейнера HTAP Azure Cosmos DB с помощью Spark в кадр данных в первой ячейке.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Агрегирование результатов в новый кадр данных
Во второй ячейке будут выполнятся преобразование и статистические вычисления, которые необходимо выполнить для нового кадра данных до его загрузки в базу данных выделенного пула SQL.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Загрузка результатов в выделенный пул SQL
В третьей ячейке данные загружаются в выделенный пул SQL. При этом автоматически создается временная внешняя таблица, внешний источник данных и формат внешнего файла, которые будут удалены после завершения задания.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Запрос результатов с помощью SQL
Вы можете запросить результат, используя простой SQL-запрос, например так, как в следующем скрипте SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Запрос отобразит следующие результаты в режиме диаграммы: