Spark を使用して Cosmos DB データに対してクエリを実行する

完了

分析ストアが有効な Azure Cosmos DB データベースのリンク サービスを追加したら、それを使用し、Azure Synapse Analytics ワークスペースの Spark プールを使ってデータに対してクエリを実行できます。

Azure Cosmos DB 分析データをデータフレームに読み込む

Azure Cosmos DB リンク サービスのデータを最初に探索したり簡単に分析したりするときは、多くの場合、PySpark (Python の Spark 固有の実装) や Scala (Spark でよく使用される Java ベースの言語) などの Spark でサポートされる言語を使用し、コンテナーからデータフレームにデータを読み込むのが最も簡単です。

たとえば、次の PySpark コードを使って、df という名前のデータフレームを、接続されている my-container コンテナー内のデータから、my_linked_service リンク サービスを使って読み込み、データの最初の 10 行のデータを表示できます。

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

次の例のような項目を格納するために、my-container コンテナーを使用するとします。

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

PySpark コードからの出力は、次の表のようになります。

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 ウィジェット 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

データは、運用ストアからではなく、コンテナー内の分析ストアから読み込まれます。これにより、運用ストアに対するクエリ オーバーヘッドをなくします。 分析データ ストアのフィールドには、アプリケーション定義フィールド (この場合は productIDproductName) と、自動的に作成されたメタデータ フィールドが含まれています。

データフレームを読み込んだ後、そのネイティブ メソッドを使ってデータを探索できます。 たとえば、次のコードでは、productID 列と productName 列のみを含む、productName で並べ替えられた新しいデータフレームが作成されます。

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

このコードの出力は次の表のようになります。

productID productName
125 Thingumy
123 ウィジェット
124 Wotsit
... ...

データフレームを Cosmos DB コンテナーに書き込む

ほとんどの HTAP シナリオでは、リンク サービスを使って、分析ストアから Spark にデータを読み取る必要があります。 ただし、次の例に示すように、データフレームの内容をコンテナーに書き込むことも "できます"。

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Note

コンテナーにデータフレームを書き込むと、"運用" ストアが更新され、そのパフォーマンスに影響を与えるおそれがあります。 その後、変更は分析ストアに同期されます。

Spark SQL を使って Azure Cosmos DB 分析データに対するクエリを実行する

Spark SQL は、Spark プールで SQL 言語の構文とリレーショナル データベースのセマンティクスを提供する Spark API です。 Spark SQL を使って、SQL を使ってクエリを実行できるテーブルのメタデータを定義できます。

たとえば、次のコードでは、前の例で使用した架空のコンテナーに基づいて Products という名前のテーブルが作成されます。

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

ヒント

コードの先頭にある %%sql キーワードは "マジック" であり、コードを既定の言語 (通常は PySpark に設定されます) ではなく SQL として実行するように Spark プールに指示します。

この方法を使用すると、Spark プールに論理データベースを作成できます。それを使って、Azure Cosmos DB 内の分析データに対してクエリを実行し、Azure Cosmos DB アカウントの運用ストアに影響を与えることなく、データ分析とレポート ワークロードをサポートできます。