Azure Cosmos DB for NoSQL の Spark 3 コネクタでサービス プリンシパルを使う
この記事では、ロールベースのアクセス制御で使用できる、Microsoft Entra のアプリケーションとサービス プリンシパルを作成する方法について説明します。 その後、このサービス プリンシパルを使って、Spark 3 から Azure Cosmos DB for NoSQL アカウントに接続できるようになります。
前提条件
- 既存の Azure Cosmos DB for NoSQL アカウント。
- Azure サブスクリプションを既にお持ちの場合は、新しいアカウントを作成します。
- Azure サブスクリプションがない場合。 Azure Cosmos DB を無料で試すことができます。クレジット カードは必要ありません。
- 既存の Azure Databricks ワークスペース。
- 登録された Microsoft Entra アプリケーションとサービス プリンシパル。
- サービス プリンシパルとアプリケーションがない場合は、Azure portal を使ってアプリケーションを登録します。
シークレットを作成して資格情報を記録する
このセクションでは、クライアント シークレットを作成し、後で使用できるように値を記録します。
Azure Portalを開きます。
既存の Microsoft Entra アプリケーションに移動します。
[証明書とシークレット] ページに移動します。 次に新しいシークレットを作成します。 この記事の後半で使うために、[クライアント シークレット] の値を保存します。
[概要] ページに移動します。 [アプリケーション (クライアント) ID]、[オブジェクト ID]、[ディレクトリ (テナント) ID] の値を見つけて記録します。 これらの値は、この記事の後半でも使います。
既存の Azure Cosmos DB for NoSQL アカウントに移動します。
[概要] ページの [URI] 値を記録します。 また、[サブスクリプション ID] と [リソース グループ] の値も記録します。 これらの値は、この記事で後ほど使用します。
定義と割り当てを作成する
このセクションでは、Microsoft Entra ID ロール定義を作成します。 次に、コンテナー内の項目の読み取りと書き込みを行うアクセス許可を持つロールを割り当てます。
az role definition create
コマンドを使用して、ロールを作成します。 Azure Cosmos DB for NoSQL のアカウント名とリソース グループを渡し、その後にカスタム ロールを定義する JSON の本文を渡します。 ロールのスコープも/
を使用してアカウント レベルに設定されます。 ロールには、必ず要求本文のRoleName
プロパティを使って一意の名前を付けます。az cosmosdb sql role definition create \ --resource-group "<resource-group-name>" \ --account-name "<account-name>" \ --body '{ "RoleName": "<role-definition-name>", "Type": "CustomRole", "AssignableScopes": ["/"], "Permissions": [{ "DataActions": [ "Microsoft.DocumentDB/databaseAccounts/readMetadata", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*" ] }] }'
作成したロール定義を列挙し、その一意の識別子を JSON 出力でフェッチします。 JSON 出力の
id
値を記録します。az cosmosdb sql role definition list \ --resource-group "<resource-group-name>" \ --account-name "<account-name>"
[ { ..., "id": "/subscriptions/<subscription-id>/resourceGroups/<resource-grou-name>/providers/Microsoft.DocumentDB/databaseAccounts/<account-name>/sqlRoleDefinitions/<role-definition-id>", ... "permissions": [ { "dataActions": [ "Microsoft.DocumentDB/databaseAccounts/readMetadata", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*" ], "notDataActions": [] } ], ... } ]
az cosmosdb sql role assignment create
を使ってロールの割り当てを作成します。<aad-principal-id>
を、この記事の前半で記録した [オブジェクト ID] に置き換えます。 また、<role-definition-id>
を、前の手順でaz cosmosdb sql role definition list
コマンドを実行してフェッチしたid
値に置き換えます。az cosmosdb sql role assignment create \ --resource-group "<resource-group-name>" \ --account-name "<account-name>" \ --scope "/" \ --principal-id "<account-name>" \ --role-definition-id "<role-definition-id>"
サービス プリンシパルを使用する
Microsoft Entra アプリケーションとサービス プリンシパルを作成し、カスタム ロールを作成し、そのロールに Azure Cosmos DB for NoSQL アカウントへのアクセス許可を割り当てたので、ノートブックを実行できるようになります。
Azure Databricks ワークスペースを開きます。
ワークスペースのインターフェイスで、新しいクラスターを作成します。 少なくとも次の設定でクラスターを構成します。
バージョン Value ランタイムのバージョン 13.3 LTS (Scala 2.12, Spark 3.4.1)
ワークスペースのインターフェイスを使って、グループ ID が
com.azure.cosmos.spark
である Maven パッケージを Maven Central で検索します。 成果物 ID の前にazure-cosmos-spark_3-4
が付いている Spark 3.4 固有のパッケージを、クラスターにインストールします。最後に、新しいノートブックを作成します。
ヒント
既定では、ノートブックは最近作成されたクラスターにアタッチされます。
ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名に関する Azure Cosmos DB Spark コネクタ構成設定を設定します。 この記事の前半で記録した [サブスクリプション ID]、[リソース グループ]、[アプリケーション (クライアント) ID]、[ディレクトリ (テナント) ID]、[クライアント シークレット] の値を使います。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.auth.type": "ServicePrincipal", "spark.cosmos.account.subscriptionId": "<subscription-id>", "spark.cosmos.account.resourceGroupName": "<resource-group-name>", "spark.cosmos.account.tenantId": "<entra-tenant-id>", "spark.cosmos.auth.aad.clientId": "<entra-app-client-id>", "spark.cosmos.auth.aad.clientSecret": "<entra-app-client-secret>", "spark.cosmos.database": "<database-name>", "spark.cosmos.container": "<container-name>" }
// Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.auth.type" -> "ServicePrincipal", "spark.cosmos.account.subscriptionId" -> "<subscription-id>", "spark.cosmos.account.resourceGroupName" -> "<resource-group-name>", "spark.cosmos.account.tenantId" -> "<entra-tenant-id>", "spark.cosmos.auth.aad.clientId" -> "<entra-app-client-id>", "spark.cosmos.auth.aad.clientSecret" -> "<entra-app-client-secret>", "spark.cosmos.database" -> "<database-name>", "spark.cosmos.container" -> "<container-name>" )
Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
CREATE DATABASE IF NOT EXISTS
を使用して新しいデータベースを作成します。 データベース名を必ず指定してください。# Create a database using the Catalog API spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format("<database-name>"))
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.<database-name>;")
指定したデータベース名、コンテナー名、パーティション キー パス、スループットの値を使って、新しいコンテナーを作成します。
# Create a products container using the Catalog API spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '{}', manualThroughput = '{}')".format("<database-name>", "<container-name>", "<partition-key-path>", "<throughput>"))
// Create a products container using the Catalog API spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.<database-name>.<container-name> using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '<partition-key-path>', manualThroughput = '<throughput>')")
サンプル データセットを作成します。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
spark.createDataFrame
と以前に保存したオンライン トランザクション処理 (OLTP) 構成を使用して、ターゲット コンテナーにサンプル データを追加します。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
ヒント
このクイック スタートの例では、資格情報がクリア テキストで変数に割り当てられます。 セキュリティを確保するために、シークレットを使用することをお勧めします。 シークレットの構成の詳細については、Spark 構成にシークレットを追加する方法に関する記事を参照してください。