Запрос данных Cosmos DB с помощью Spark

Завершено

После добавления связанной службы для базы данных Azure Cosmos DB с поддержкой аналитического хранилища его можно использовать для запроса данных с помощью пула Spark в рабочей области Azure Synapse Analytics.

Загрузка аналитических данных Azure Cosmos DB в кадр данных

Для первоначального изучения или быстрого анализа данных из связанной службы Azure Cosmos DB часто проще загружать данные из контейнера в кадр данных с помощью поддерживаемого Spark языка, например PySpark (реализация Python для конкретной spark) или Scala (язык на основе Java, часто используемый в Spark).

Например, следующий код PySpark можно использовать для загрузки кадра данных с именем df из данных в контейнере my-container, подключенном к связанной службе my_linked_service, и отображения первых 10 строк данных:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Предположим, контейнер my-container используется для хранения элементов, аналогичных следующему примеру:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Выходные данные кода PySpark должны быть похожи указанные в следующей таблице:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Мини-приложение 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Данные загружаются из аналитического хранилища в контейнере, а не из операционного хранилища, что обеспечивает отсутствие дополнительных затрат на запросы в операционном хранилище. Поля в хранилище аналитических данных включают определенные приложением поля (в данном случае productID и productName) и автоматически созданные поля метаданных.

После загрузки кадра данных вы можете использовать собственные методы для изучения данных. Например, следующий код создает новый кадр данных, содержащий только столбцы productID и productName, упорядоченные по productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Выходные данные этого кода будут выглядеть примерно так:

productID productName
125 Thingumy
123 Мини-приложение
124 Wotsit
... ...

Запись кадра данных в контейнер Cosmos DB

В большинстве сценариев HTAP следует использовать связанную службу для чтения данных в Spark из аналитического хранилища. Однако содержимое кадра данных можно записать в контейнер, как показано в следующем примере:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Примечание.

Запись кадра данных в контейнер обновляет рабочее хранилище и может повлиять на его производительность. Затем изменения синхронизируются с аналитическим хранилищем.

Использование Spark SQL для запроса аналитических данных Azure Cosmos DB

Spark SQL — это API Spark, который предоставляет синтаксис языка SQL и семантику реляционной базы данных в пуле Spark. Spark SQL можно использовать для определения метаданных таблиц, которые можно запрашивать с помощью SQL.

Например, следующий код создает таблицу Products на основе гипотетического контейнера, используемого в предыдущих примерах:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Совет

Ключевое слово %%sql в начале кода — это магическая команда, которая предписывает пулу Spark выполнять код как SQL, а не как язык по умолчанию (обычно используется PySpark).

С помощью этого подхода можно создать логическую базу данных в пуле Spark, которую затем можно использовать для запроса аналитических данных в Azure Cosmos DB для поддержки анализа данных и создания отчетов рабочих нагрузок, не влияя на рабочее хранилище в учетной записи Azure Cosmos DB.