使用 Spark 查詢 Cosmos DB 資料
針對已啟用分析存放區的 Azure Cosmos DB 資料庫,新增連結服務後,您可以使用連結服務,查詢使用 Azure Synapse Analytics 工作區中 Spark 集區的資料。
將 Azure Cosmos DB 分析資料載入資料框架
若要從 Azure Cosmos DB 連結服務初始探索或快速分析資料,通常最簡單的方式是使用 PySpark (Python 的 Spark 特定實作),或 Scala (Spark 上常用的 Java 型語言) 等Spark 支援語言,將資料從容器載入資料框架。
例如,下列 PySpark 程式碼可用來從使用 my_linked_service 連結服務連線 my-container 容器中的資料,載入名為 df 的資料框架,並顯示前 10 個資料列:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
假設 my-container 容器用來儲存類似下列範例的項目:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
PySpark 程式碼的輸出會類似下表:
_rid | _ts | productID | productName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | Widget | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
資料會從容器中的分析存放區載入,而不是作業存放區,請確保作業存放區沒有查詢額外負荷。 分析資料存放區中的欄位包含應用程式定義欄位 (在此案例中即 productID 和 productName),並自動建立中繼資料欄位。
載入資料框架後,您可以使用資料框架的原生方法探索資料。 例如,下列程式碼會建立新的資料框架,包含依 productName 排序的 productID 和 productName 資料行:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
此程式碼的輸出會類似下表:
productID | productName |
---|---|
125 | Thingumy |
123 | Widget |
124 | Wotsit |
... | ... |
將資料框架寫入 Cosmos DB 容器
在大部分 HTAP 案例中,您應該使用連結服務,將資料從分析存放區讀取至 Spark。 但您可以將資料框架的內容寫入容器,如下列範例所示:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
注意
將資料框架寫入容器會更新作業存放區,並可能影響存放區的效能。 然後同步變更至分析存放區。
使用 Spark SQL 查詢 Azure Cosmos DB 分析資料
Spark SQL 是 Spark API,並在 Spark 集區中提供 SQL 語言語法和關聯式資料庫語意。 您可以使用 Spark SQL 針對可使用 SQL 查詢的資料表,定義中繼資料。
例如,下列程式碼會根據之前範例中使用的假設容器,建立名為 Products 的資料表:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
提示
程式碼開頭的 %%sql
關鍵字 magic 會指示 Spark 集區以 SQL 執行程式碼,而不是預設語言 (通常設為 PySpark)。
使用此方法,您可以在 Spark 集區中建立邏輯資料庫,然後用來查詢 Azure Cosmos DB 中的分析資料,並支援資料分析和報告工作負載,而不影響 Azure Cosmos DB 帳戶中的作業存放區。