使用 Spark 查询 Cosmos DB 数据
为启用了分析存储的 Azure Cosmos DB 数据库添加链接服务后,可以使用它通过 Azure Synapse Analytics 工作区中的 Spark 池查询数据。
将 Azure Cosmos DB 分析数据加载到数据帧
为了对来自 Azure Cosmos DB 链接服务的数据进行初始浏览或快速分析,通常最简单的方法是使用 Spark 支持的语言(如 PySpark (Spark 特定的 Python 实现) 或 Scala (经常在 Spark 上的基于 Java 的语言))将数据从容器加载到数据帧中。
例如,以下 PySpark 代码可用于从使用 my_linked_service 链接服务连接到的 my-container 容器中的数据加载名为 df 的数据帧,并显示前 10 行数据:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
假设 my-container 容器用于存储类似于以下示例的项目:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
PySpark 代码输出应如下表所示:
_rid | _ts | productID | productName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | 小组件 | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
数据从容器中的分析存储加载,而不是从操作存储中加载;确保操作存储没有查询开销。 分析数据存储中的字段包括应用程序定义的字段(本例中为 productID 和 productName)和自动创建的元数据字段。
加载数据帧后,可以使用其本机方法浏览数据。 例如,以下代码创建一个新的数据帧,其中包含 productID 和 productName 列,按 productName 排序:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
此代码输出如下表所示:
productID | productName |
---|---|
125 | Thingumy |
123 | 小组件 |
124 | Wotsit |
... | ... |
将数据帧写入Cosmos DB 容器
在大多数 HTAP 方案中,应使用链接服务从分析存储中将数据读入 Spark。 但是,可以将数据帧的内容写入容器,如以下示例所示:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
注意
将数据帧写入容器会更新操作存储,并可能会影响其性能。 然后,更改将同步到分析存储。
使用 Spark SQL 查询 Azure Cosmos DB 分析数据
Spark SQL 是一个 Spark API,它提供 Spark 池中的 SQL 语言语法和关系数据库语义。 可以使用 Spark SQL 为可以使用 SQL 查询的表定义元数据。
例如,以下代码基于前面示例中使用的假设容器创建名为 Products 的表:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
提示
代码开头的 %%sql
关键字是一个 magic,指示 Spark 池以 SQL 而不是默认语言(通常设置为 PySpark)运行代码。
通过使用此方法,可以在 Spark 池中创建一个逻辑数据库,然后可用于查询 Azure Cosmos DB 中的分析数据,以支持数据分析和报告工作负载,而不会影响 Azure Cosmos DB 帐户中的操作存储。