搭配適用於 NoSQL 的 Azure Cosmos DB 的 Spark 3 連接器使用服務主體
在本文中,您將瞭解如何建立可搭配角色型訪問控制使用的Microsoft Entra 應用程式和服務主體。 然後,您可以使用此服務主體從 Spark 3 連線到適用於 NoSQL 的 Azure Cosmos DB 帳戶。
必要條件
- 現有的 Azure Cosmos DB for NoSQL 帳戶。
- 如果您有現有的 Azure 訂用帳戶,請建立新的帳戶。
- 沒有 Azure 訂用帳戶? 您可以免費試用 Azure Cosmos DB,不需要信用卡。
- 現有 Azure Databricks 工作區。
- 已註冊Microsoft Entra 應用程式和服務主體。
- 如果您沒有服務主體和應用程式,請使用 Azure 入口網站 註冊應用程式。
建立秘密和記錄認證
在本節中,您會建立客戶端密碼,並記錄值以供稍後使用。
開啟 Azure 入口網站。
移至現有的 Microsoft Entra 應用程式。
移至 [ 憑證與秘密] 頁面。 然後,建立新的秘密。 儲存 客戶端密碼 值,以供本文稍後使用。
移至概觀頁面。 找出並記錄應用程式 (用戶端) 識別碼、物件識別碼和目錄 (租使用者) 識別碼的值。 您稍後也會在本文中使用這些值。
移至您現有的適用於 NoSQL 的 Azure Cosmos DB 帳戶。
在 [概觀] 頁面上記錄 URI 值。 也記錄訂用帳戶 標識碼 和資源 群組 值。 在本文後續內容中,您將會用到這些值。
建立定義和指派
在本節中,您會建立Microsoft Entra ID 角色定義。 然後,您會指派具有讀取和寫入容器中項目的許可權角色。
使用
az role definition create
命令建立角色。 傳入適用於 NoSQL 的 Azure Cosmos DB 帳戶名稱和資源群組,後面接著定義自定義角色的 JSON 主體。 此角色也會使用/
來限定為帳戶層級。 請確定RoleName
您使用要求本文的 屬性,為您的角色提供唯一的名稱。az cosmosdb sql role definition create \ --resource-group "<resource-group-name>" \ --account-name "<account-name>" \ --body '{ "RoleName": "<role-definition-name>", "Type": "CustomRole", "AssignableScopes": ["/"], "Permissions": [{ "DataActions": [ "Microsoft.DocumentDB/databaseAccounts/readMetadata", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*" ] }] }'
列出您建立的角色定義,以在 JSON 輸出中擷取其唯一標識碼。
id
記錄 JSON 輸出的值。az cosmosdb sql role definition list \ --resource-group "<resource-group-name>" \ --account-name "<account-name>"
[ { ..., "id": "/subscriptions/<subscription-id>/resourceGroups/<resource-grou-name>/providers/Microsoft.DocumentDB/databaseAccounts/<account-name>/sqlRoleDefinitions/<role-definition-id>", ... "permissions": [ { "dataActions": [ "Microsoft.DocumentDB/databaseAccounts/readMetadata", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*", "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*" ], "notDataActions": [] } ], ... } ]
使用
az cosmosdb sql role assignment create
來建立角色指派。 將取代<aad-principal-id>
為您稍早在本文中記錄的物件 標識碼 。 此外,請將 取代<role-definition-id>
為id
在上一個步驟中執行 命令所擷az cosmosdb sql role definition list
取的值。az cosmosdb sql role assignment create \ --resource-group "<resource-group-name>" \ --account-name "<account-name>" \ --scope "/" \ --principal-id "<account-name>" \ --role-definition-id "<role-definition-id>"
使用服務主體
既然您已建立Microsoft Entra 應用程式和服務主體、建立自定義角色,並將該角色許可權指派給適用於 NoSQL 的 Azure Cosmos DB 帳戶,您應該能夠執行筆記本。
開啟您的 Azure Databricks 工作區。
在工作區介面中,建立新的叢集。 設定叢集時,請至少符合下列設定:
版本 值 執行階段版本 13.3 LTS (Scala 2.12, Spark 3.4.1)
使用工作區介面,從 Maven Central 搜尋具有群組標識碼的
com.azure.cosmos.spark
Maven 套件。 特別針對Spark 3.4安裝套件,其前置詞為azure-cosmos-spark_3-4
叢集的成品標識碼。最後,建立新的筆記本。
提示
根據預設,筆記本會附加至最近建立的叢集。
在筆記本中,設定 NoSQL 帳戶端點、資料庫名稱和容器名稱的 Azure Cosmos DB Spark 連接器組態設定。 使用本文稍早記錄的 訂用帳戶標識碼、 資源群組、 應用程式(用戶端)標識碼、 目錄(租使用者)標識碼和 客戶端密碼 值。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.auth.type": "ServicePrincipal", "spark.cosmos.account.subscriptionId": "<subscription-id>", "spark.cosmos.account.resourceGroupName": "<resource-group-name>", "spark.cosmos.account.tenantId": "<entra-tenant-id>", "spark.cosmos.auth.aad.clientId": "<entra-app-client-id>", "spark.cosmos.auth.aad.clientSecret": "<entra-app-client-secret>", "spark.cosmos.database": "<database-name>", "spark.cosmos.container": "<container-name>" }
// Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.auth.type" -> "ServicePrincipal", "spark.cosmos.account.subscriptionId" -> "<subscription-id>", "spark.cosmos.account.resourceGroupName" -> "<resource-group-name>", "spark.cosmos.account.tenantId" -> "<entra-tenant-id>", "spark.cosmos.auth.aad.clientId" -> "<entra-app-client-id>", "spark.cosmos.auth.aad.clientSecret" -> "<entra-app-client-secret>", "spark.cosmos.database" -> "<database-name>", "spark.cosmos.container" -> "<container-name>" )
使用 Spark 設定目錄 API 來管理 NoSQL 資源的 API。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
使用
CREATE DATABASE IF NOT EXISTS
建立新的資料庫。 請確定您提供資料庫名稱。# Create a database using the Catalog API spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format("<database-name>"))
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.<database-name>;")
使用您指定的資料庫名稱、容器名稱、分割區索引鍵路徑和輸送量值,建立新的容器。
# Create a products container using the Catalog API spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '{}', manualThroughput = '{}')".format("<database-name>", "<container-name>", "<partition-key-path>", "<throughput>"))
// Create a products container using the Catalog API spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.<database-name>.<container-name> using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '<partition-key-path>', manualThroughput = '<throughput>')")
建立範例數據集。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
使用
spark.createDataFrame
先前儲存的在線事務處理 (OLTP) 組態,將範例數據新增至目標容器。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
提示
在本快速入門範例中,認證會以純文本指派給變數。 基於安全性,我們建議您使用秘密。 如需如何設定秘密的詳細資訊,請參閱 將秘密新增至Spark設定。