Apache Spark を使用して Azure Cosmos DB から専用 SQL プールにデータをコピーする
Azure Synapse Link for Azure Cosmos DB を使用すると、ユーザーは Azure Cosmos DB のオペレーショナル データに対してほぼリアルタイムの分析を実行できます。 ただし、データ ウェアハウス ユーザーにサービスを提供するために、一部のデータを集計してエンリッチメント処理することが必要な場合があります。 Azure Synapse Link のデータのキュレーションとエクスポートは、ノートブックの少数のセルで実行できます。
前提条件
- 以下を使用して、Synapse ワークスペースをプロビジョニングする
- データが含まれた HTAP コンテナーで Azure Cosmos DB アカウントをプロビジョニングする
- Azure Cosmos DB HTAP コンテナーをワークスペースに接続する
- Spark から専用 SQL プールにデータをインポートするための適切なセットアップを行う
手順
このチュートリアルでは、トランザクション ストアに影響を与えないように分析ストアに接続します (要求ユニットを消費しません)。 ここでは、次の手順について説明します。
- Azure Cosmos DB HTAP コンテナーを Spark データフレームに読み取る
- 新しいデータフレームで結果を集計する
- 専用 SQL プールにデータを取り込む
Data
この例では、RetailSales という HTAP コンテナーを使用します。 これは、ConnectedData というリンクされたサービスの一部です。スキーマは次のとおりです。
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
レポート用に、売上 ("数量"、"収益" (価格 x 数量)) を productCode と weekStarting で集計します。 最後に、そのデータを dbo.productsales
という専用 SQL プール テーブルにエクスポートします。
Spark ノートブックを構成する
Scala as Spark (Scala) を主要言語とした Spark ノートブックを作成します。 セッションにはノートブックのデフォルト設定を使用します。
Spark でデータを読み取る
Spark を使用した Azure Cosmos DB HTAP コンテナーを最初のセルのデータフレームに読み取ります。
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
新しいデータフレームで結果を集計する
2 番目のセルでは、専用 SQL プール データベースに読み込む前に、新しいデータフレームに必要な変換と集計を実行します。
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
結果を専用 SQL プールに読み込む
3 番目のセルでは、データを専用 SQL プールに読み込みます。 これにより、ジョブが完了したら削除される一時的な外部テーブル、外部データ ソース、外部ファイル形式が自動的に作成されます。
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
SQL を使用して結果のクエリを実行する
次の SQL スクリプトのような単純な SQL クエリを使用して、結果のクエリを実行できます。
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]