Spark を使用して Cosmos DB データに対してクエリを実行する
分析ストアが有効な Azure Cosmos DB データベースのリンク サービスを追加したら、それを使用し、Azure Synapse Analytics ワークスペースの Spark プールを使ってデータに対してクエリを実行できます。
Azure Cosmos DB 分析データをデータフレームに読み込む
Azure Cosmos DB リンク サービスのデータを最初に探索したり簡単に分析したりするときは、多くの場合、PySpark (Python の Spark 固有の実装) や Scala (Spark でよく使用される Java ベースの言語) などの Spark でサポートされる言語を使用し、コンテナーからデータフレームにデータを読み込むのが最も簡単です。
たとえば、次の PySpark コードを使って、df という名前のデータフレームを、接続されている my-container コンテナー内のデータから、my_linked_service リンク サービスを使って読み込み、データの最初の 10 行のデータを表示できます。
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
次の例のような項目を格納するために、my-container コンテナーを使用するとします。
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
PySpark コードからの出力は、次の表のようになります。
_rid | _ts | productID | productName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | ウィジェット | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
データは、運用ストアからではなく、コンテナー内の分析ストアから読み込まれます。これにより、運用ストアに対するクエリ オーバーヘッドをなくします。 分析データ ストアのフィールドには、アプリケーション定義フィールド (この場合は productID と productName) と、自動的に作成されたメタデータ フィールドが含まれています。
データフレームを読み込んだ後、そのネイティブ メソッドを使ってデータを探索できます。 たとえば、次のコードでは、productID 列と productName 列のみを含む、productName で並べ替えられた新しいデータフレームが作成されます。
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
このコードの出力は次の表のようになります。
productID | productName |
---|---|
125 | Thingumy |
123 | ウィジェット |
124 | Wotsit |
... | ... |
データフレームを Cosmos DB コンテナーに書き込む
ほとんどの HTAP シナリオでは、リンク サービスを使って、分析ストアから Spark にデータを読み取る必要があります。 ただし、次の例に示すように、データフレームの内容をコンテナーに書き込むことも "できます"。
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Note
コンテナーにデータフレームを書き込むと、"運用" ストアが更新され、そのパフォーマンスに影響を与えるおそれがあります。 その後、変更は分析ストアに同期されます。
Spark SQL を使って Azure Cosmos DB 分析データに対するクエリを実行する
Spark SQL は、Spark プールで SQL 言語の構文とリレーショナル データベースのセマンティクスを提供する Spark API です。 Spark SQL を使って、SQL を使ってクエリを実行できるテーブルのメタデータを定義できます。
たとえば、次のコードでは、前の例で使用した架空のコンテナーに基づいて Products という名前のテーブルが作成されます。
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
ヒント
コードの先頭にある %%sql
キーワードは "マジック" であり、コードを既定の言語 (通常は PySpark に設定されます) ではなく SQL として実行するように Spark プールに指示します。
この方法を使用すると、Spark プールに論理データベースを作成できます。それを使って、Azure Cosmos DB 内の分析データに対してクエリを実行し、Azure Cosmos DB アカウントの運用ストアに影響を与えることなく、データ分析とレポート ワークロードをサポートできます。