使用 Spark 查询 Cosmos DB 数据

已完成

为启用了分析存储的 Azure Cosmos DB 数据库添加链接服务后,可以使用它通过 Azure Synapse Analytics 工作区中的 Spark 池查询数据。

将 Azure Cosmos DB 分析数据加载到数据帧

为了对来自 Azure Cosmos DB 链接服务的数据进行初始浏览或快速分析,通常最简单的方法是使用 Spark 支持的语言(如 PySpark (Spark 特定的 Python 实现) 或 Scala (经常在 Spark 上的基于 Java 的语言))将数据从容器加载到数据帧中。

例如,以下 PySpark 代码可用于从使用 my_linked_service 链接服务连接到的 my-container 容器中的数据加载名为 df 的数据帧,并显示前 10 行数据:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

假设 my-container 容器用于存储类似于以下示例的项目:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

PySpark 代码输出应如下表所示:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 小组件 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

数据从容器中的分析存储加载,而不是从操作存储中加载;确保操作存储没有查询开销。 分析数据存储中的字段包括应用程序定义的字段(本例中为 productID 和 productName)和自动创建的元数据字段。

加载数据帧后,可以使用其本机方法浏览数据。 例如,以下代码创建一个新的数据帧,其中包含 productID 和 productName 列,按 productName 排序:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

此代码输出如下表所示:

productID productName
125 Thingumy
123 小组件
124 Wotsit
... ...

将数据帧写入Cosmos DB 容器

在大多数 HTAP 方案中,应使用链接服务从分析存储中将数据读入 Spark。 但是,可以将数据帧的内容写入容器,如以下示例所示:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

注意

将数据帧写入容器会更新操作存储,并可能会影响其性能。 然后,更改将同步到分析存储。

使用 Spark SQL 查询 Azure Cosmos DB 分析数据

Spark SQL 是一个 Spark API,它提供 Spark 池中的 SQL 语言语法和关系数据库语义。 可以使用 Spark SQL 为可以使用 SQL 查询的表定义元数据。

例如,以下代码基于前面示例中使用的假设容器创建名为 Products 的表:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

提示

代码开头的 %%sql 关键字是一个 magic,指示 Spark 池以 SQL 而不是默认语言(通常设置为 PySpark)运行代码。

通过使用此方法,可以在 Spark 池中创建一个逻辑数据库,然后可用于查询 Azure Cosmos DB 中的分析数据,以支持数据分析和报告工作负载,而不会影响 Azure Cosmos DB 帐户中的操作存储。