Запрос данных Cosmos DB с помощью Spark
После добавления связанной службы для базы данных Azure Cosmos DB с поддержкой аналитического хранилища его можно использовать для запроса данных с помощью пула Spark в рабочей области Azure Synapse Analytics.
Загрузка аналитических данных Azure Cosmos DB в кадр данных
Для первоначального изучения или быстрого анализа данных из связанной службы Azure Cosmos DB часто проще загружать данные из контейнера в кадр данных с помощью поддерживаемого Spark языка, например PySpark (реализация Python для конкретной spark) или Scala (язык на основе Java, часто используемый в Spark).
Например, следующий код PySpark можно использовать для загрузки кадра данных с именем df из данных в контейнере my-container, подключенном к связанной службе my_linked_service, и отображения первых 10 строк данных:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
Предположим, контейнер my-container используется для хранения элементов, аналогичных следующему примеру:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
Выходные данные кода PySpark должны быть похожи указанные в следующей таблице:
_rid | _ts | productID | productName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | Мини-приложение | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
Данные загружаются из аналитического хранилища в контейнере, а не из операционного хранилища, что обеспечивает отсутствие дополнительных затрат на запросы в операционном хранилище. Поля в хранилище аналитических данных включают определенные приложением поля (в данном случае productID и productName) и автоматически созданные поля метаданных.
После загрузки кадра данных вы можете использовать собственные методы для изучения данных. Например, следующий код создает новый кадр данных, содержащий только столбцы productID и productName, упорядоченные по productName:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
Выходные данные этого кода будут выглядеть примерно так:
productID | productName |
---|---|
125 | Thingumy |
123 | Мини-приложение |
124 | Wotsit |
... | ... |
Запись кадра данных в контейнер Cosmos DB
В большинстве сценариев HTAP следует использовать связанную службу для чтения данных в Spark из аналитического хранилища. Однако содержимое кадра данных можно записать в контейнер, как показано в следующем примере:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Примечание.
Запись кадра данных в контейнер обновляет рабочее хранилище и может повлиять на его производительность. Затем изменения синхронизируются с аналитическим хранилищем.
Использование Spark SQL для запроса аналитических данных Azure Cosmos DB
Spark SQL — это API Spark, который предоставляет синтаксис языка SQL и семантику реляционной базы данных в пуле Spark. Spark SQL можно использовать для определения метаданных таблиц, которые можно запрашивать с помощью SQL.
Например, следующий код создает таблицу Products на основе гипотетического контейнера, используемого в предыдущих примерах:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
Совет
Ключевое слово %%sql
в начале кода — это магическая команда, которая предписывает пулу Spark выполнять код как SQL, а не как язык по умолчанию (обычно используется PySpark).
С помощью этого подхода можно создать логическую базу данных в пуле Spark, которую затем можно использовать для запроса аналитических данных в Azure Cosmos DB для поддержки анализа данных и создания отчетов рабочих нагрузок, не влияя на рабочее хранилище в учетной записи Azure Cosmos DB.