チュートリアル: Spark を使用して Azure Cosmos DB for NoSQL に接続する
適用対象: NoSQL
このチュートリアルでは、Azure Cosmos DB Spark コネクタを使って、Azure Cosmos DB for NoSQL アカウントのデータの読み取りや書き込みを行います。 このチュートリアルでは、Azure Databricks と Jupyter ノートブックを使って、Spark から API for NoSQL と統合する方法を見ていただきます。 Spark でサポートされている任意の言語またはインターフェイスを使用できますが、このチュートリアルでは Python と Scala に焦点を当てます。
このチュートリアルでは、次の作業を行う方法について説明します。
- Spark と Jupyter ノートブックを使って API for NoSQL アカウントに接続します。
- データベースとコンテナーのリソースを作成します。
- データをコンテナーに取り込みます。
- コンテナー内のデータのクエリを実行します。
- コンテナー内の項目に対して一般的な操作を実行します。
前提条件
- 既存の Azure Cosmos DB for NoSQL アカウント。
- Azure サブスクリプションを既にお持ちの場合は、新しいアカウントを作成します。
- Azure サブスクリプションがない場合。 Azure Cosmos DB を無料で試すことができます。クレジット カードは必要ありません。
- 既存の Azure Databricks ワークスペース。
Spark と Jupyter を使用して接続する
Apache Spark 3.4.x を使って Azure Cosmos DB for NoSQL アカウントに接続できる状態のコンピューティング クラスターを、既存の Azure Databricks ワークスペースを使って作成します。
Azure Databricks ワークスペースを開きます。
ワークスペースのインターフェイスで、新しいクラスターを作成します。 少なくとも次の設定でクラスターを構成します。
バージョン Value ランタイムのバージョン 13.3 LTS (Scala 2.12、Spark 3.4.1) ワークスペースのインターフェイスを使って、グループ ID が
com.azure.cosmos.spark
である Maven パッケージを Maven Central で検索します。 成果物 ID の前にazure-cosmos-spark_3-4
が付いている Spark 3.4 固有のパッケージを、クラスターにインストールします。最後に、新しいノートブックを作成します。
ヒント
既定では、ノートブックは最近作成されたクラスターにアタッチされます。
ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名のオンライン トランザクション処理 (OLTP) 構成設定を設定します。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
データベースとコンテナーを作成する
Catalog API を使って、データベースやコンテナーなどのアカウント リソースを管理します。 その後、OLTP を使ってコンテナー リソース内のデータを管理できます。
Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
CREATE DATABASE IF NOT EXISTS
を使用して、cosmicworks
という新しいデータベースを作成します。# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
CREATE TABLE IF NOT EXISTS
を使って、products
という名前の新しいコンテナーを作成します。 パーティション キーのパスを/category
に設定し、最大スループットを1000
1 秒あたりの要求ユニット数 (RU) にして自動スケーリング スループットを有効にします。# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
階層的なパーティション キー構成を使用して、
employees
という名前を付けた別のコンテナーを作成します。 パーティション キー パスのセットとして、/organization
、/department
、/team
を使用します。 その特定の順序に従います。 また、スループットを400
RU の手動の量に設定します。# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
ノートブックのセルを実行して、API for NoSQL アカウント内にデータベースとコンテナーが作成されることを検証します。
データの取り込み
サンプル データセットを作成します。 次に、OLTP を使用して、そのデータを NoSQL コンテナー用 API に取り込みます。
サンプル データセットを作成します。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
spark.createDataFrame
と前に保存した OLTP 構成を使って、サンプル データをターゲット コンテナーに追加します。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
クエリ データ
OLTP データをデータ フレームに読み込み、データに対して一般的なクエリを実行します。 さまざまな構文を使って、データのフィルターまたはクエリを実行できます。
spark.read
を使って、OLTP データをデータフレーム オブジェクトに読み込みます。 このチュートリアルで前に使ったのと同じ構成を使います。 また、spark.cosmos.read.inferSchema.enabled
をtrue
に設定して、Spark コネクタが既存の項目をサンプリングしてスキーマを推論できるようにします。# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
printSchema
を使って、データフレームに読み込まれたデータのスキーマをレンダリングします。# Render schema df.printSchema()
// Render schema df.printSchema()
quantity
列が20
未満のデータ行をレンダリングします。 このクエリを実行するには、where
とshow
関数を使います。# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
clearance
列がtrue
である最初のデータ行をレンダリングします。 このクエリを実行するには、filter
関数を使います。# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
フィルターまたは切り詰めなしで、5 行のデータをレンダリングします。
show
関数を使って、外観とレンダリングされる行数をカスタマイズします。# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
次の生の NoSQL クエリ文字列を使ってデータのクエリを実行します:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
一般的な操作を実行する
Spark で API for NoSQL データを使用するときは、生の JSON として部分的な更新を実行したりデータを操作したりできます。
項目の部分的な更新を実行するには、次の操作を実行します。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ItemPatch
への書き込み戦略を構成します。 その後、一括サポートを無効にします。 列とマップされた操作を設定します。 最後に、既定の操作の種類をSet
に設定します。# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
このパッチ操作の一部としてターゲットにする項目のパーティション キーと一意識別子のための変数を作成します。
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
ターゲット項目を指定し、変更する必要があるフィールドを指定するための、パッチ オブジェクトのセットを作成します。
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
パッチ オブジェクトのセットを使用してデータ フレームを作成します。
write
を使用してパッチ操作を実行します。# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
クエリを実行して、パッチ操作の結果を確認します。 これで、項目は
Yamba New Surfboard
という名前になり、それ以外は変更されていないはずです。# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
生の JSON データを操作するには、次の操作を実行します。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ターゲット コンテナーをemployees
に変更します。 次に、未加工の JSON データを使用するように、contacts
列/フィールドを構成します。# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
コンテナーに取り込むための一連の従業員を作成します。
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
データ フレームを作成し、
write
を使って従業員データを取り込みます。# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
show
を使ってデータ フレームからデータをレンダリングします。contacts
列が出力で生の JSON であることを確認します。# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
関連するコンテンツ
- Apache Spark
- Azure Cosmos DB Catalog API
- 構成パラメーター参照
- Azure Cosmos DB Spark コネクタのサンプル
- Spark 2.4 から Spark 3.* に移行する
- バージョン互換性:
- リリース ノート:
- ダウンロード リンク: