在 Salesforce Data Cloud 上运行联合查询
本文介绍如何设置 Lakehouse Federation,以便对不受 Azure Databricks 管理的 Salesforce Data Cloud 数据运行联合查询。 若要详细了解 Lakehouse Federation,请参阅“什么是 Lakehouse Federation?”。
若要使用 Lakehouse Federation 连接到 Salesforce Data Cloud 数据库,必须在 Azure Databricks Unity Catalog 元存储中创建以下内容:
- 与 Salesforce Data Cloud 数据库的连接。
- 镜像 Unity Catalog 中的 Salesforce Data Cloud 数据库外部目录,以便你可使用 Unity Catalog 查询语法和数据治理工具来管理 Azure Databricks 用户对数据库的访问。
开始之前
工作区要求:
- 已为 Unity Catalog 启用工作区。
计算要求:
- 从 Databricks Runtime 群集或 SQL 仓库到目标数据库系统的网络连接。 请参阅 Lakehouse Federation 网络建议。
- Azure Databricks 群集必须使用 Databricks Runtime 15.2(或更高版本)和共享或单用户访问模式。
- SQL 仓库必须是 Pro 或无服务器仓库,且必须使用 2024.30 或更高版本。
所需的权限:
- 若要创建连接,你必须是元存储管理员或对附加到工作区的 Unity Catalog 元存储具有
CREATE CONNECTION
权限的用户。 - 若要创建外部目录,必须对元存储具有
CREATE CATALOG
权限,并且是连接的所有者或对连接具有CREATE FOREIGN CATALOG
特权。
后面每个基于任务的部分都指定了其他权限要求。
创建与 Salesforce 连接的应用程序
Salesforce 连接的应用允许外部应用程序使用 API 和标准协议与 Salesforce 集成。 本部分介绍如何使用 SSO 创建连接的应用,以允许 Databricks 使用 Salesforce 进行身份验证。
注意
有关更详细的说明,请参阅 Salesforce Data Cloud 文档中的创建连接应用。
若要创建与 Salesforce 连接的应用,请执行以下操作:
- 在 Data Cloud 的右上角,单击“设置”。
- 在“平台工具”下,单击“应用”>“应用管理器”。
- 单击“新建连接的应用”。
- 输入姓名和联系人电子邮件地址。
- 启用“OAuth 设置”:
- 输入回叫 URL,格式如下:
https://<databricks_instance_url>/login/oauth/salesforce.html
。 例如:https://cust-success.cloud.databricks.com/login/oauth/salesforce.html
。 - (可选)如果计划在下一步中使用 SQL 创建 Azure Databricks 连接和外部目录,则 Salesforce Connected 应用程序还需要支持重定向 URI
https://login.salesforce.com/services/oauth2/success
。 如果计划使用目录资源管理器创建 Azure Databricks 连接和外部目录,则不需如此。 Databricks 建议使用目录资源管理器,因为它需要的手动步骤比其他方法更少。 - 添加以下范围:
- 访问所有 Data Cloud API 资源 (cdp_api)
- 通过 API 管理用户数据 (api)
- 对 Data Cloud 数据执行 ANSI SQL 查询 (cdp_query_api)
- 随时执行请求(refresh_token、offline_access)
- 单击“ 保存”。
- 单击“继续” 。
- 输入回叫 URL,格式如下:
- 在“连接的应用程序概述”页面上,单击“管理使用者详细信息”。 系统将提示你进行身份验证。
- 身份验证成功后,页面会显示使用者密钥和使用者机密。 保存这些值。 创建 Azure Databricks 连接时需要用到它们。
创建 Azure Databricks 连接
连接指定用于访问外部数据库系统的路径和凭据。 若要创建连接,可以使用目录资源管理器,或者使用 Azure Databricks 笔记本或 Databricks SQL 查询编辑器中的 CREATE CONNECTION
SQL 命令。
注意
你还可以使用 Databricks REST API 或 Databricks CLI 来创建连接。 请参阅 POST /api/2.1/unity-catalog/connections 和 Unity Catalog 命令。
所需的权限:具有 CREATE CONNECTION
特权的元存储管理员或用户。
目录资源管理器
- 在 Azure Databricks 工作区中,单击 “目录”。
- 在左窗格中,展开“外部数据”菜单,然后选择“连接”。
- 单击“创建连接”。
- 输入用户友好的连接名称。
- 选择 Salesforce Data Cloud 的连接类型。
- 输入 Salesforce Data Cloud 的以下连接属性。
- 身份验证类型:
OAuth
- 是沙盒
false
- (OAuth) 客户端密码:Salesforce 连接的应用程序使用者机密
- (OAuth) 客户端 ID:Salesforce 连接的应用程序使用者密钥
- (OAuth) 客户端范围:
cdp_api api cdp_query_api refresh_token offline_access
- 身份验证类型:
- 单击“使用 Salesforce 登录”。
- (OAuth) 系统会提示使用 SSO 凭据登录到 Salesforce Data Cloud。
- 成功登录后,将定向回 Databricks“创建连接”页面。 已将“使用 Salesforce 登录”按钮替换为
Successfully authorized
消息。 - (可选)添加注释。
- 单击“创建”。
SQL
Databricks 建议使用目录资源管理器创建连接和外部目录,因为它需要的手动步骤比其他方法更少。
如果计划使用 SQL 创建 Azure Databricks 连接和外部目录,则 Salesforce Connected 应用程序需要支持重定向 URI https://login.salesforce.com/services/oauth2/success
。 如果使用目录资源管理器,则不需如此。
生成 PKCE 代码验证器和代码质询代码。 可以使用联机工具(例如 https://tonyxu-io.github.io/pkce-generator/)或运行以下 Python 脚本执行此操作:
%python import base64 import re import os import hashlib code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') code_challenge = code_challenge.replace('=', '') print(f"pkce_verifier = \"{code_verifier}\"") print(f"code_challenge = \"{code_challenge}\"")
访问以下 URL 并使用 Salesforce 凭据进行身份验证,以获取
authorization_code
(用你的参数替换<client_id>
和<code_challenge>
)。https://login.salesforce.com/services/oauth2/authorize ?client_id=<client_id> &redirect_uri=https://login.salesforce.com/services/oauth2/success &response_type=code &code_challenge=<code_challenge>
URL 编码的授权代码在重定向 URL 中可见。
在笔记本或 Databricks SQL 查询编辑器中运行以下命令:
CREATE CONNECTION '<Connection name>' TYPE salesforce_data_cloud OPTIONS ( client_id '<Consumer key from Salesforce Connected App>', client_secret '<Consumer secret from Salesforce Connected App>', pkce_verifier '<pkce_verifier from the last step>', authorization_code '<URL decoded `authorization_code`, should end with == instead of %3D%3D>', oauth_redirect_uri "https://login.salesforce.com/services/oauth2/success", oauth_scope "cdp_api api cdp_query_api refresh_token offline access", is_sandbox "false" );
Databricks 建议对凭据等敏感值使用 Azure Databricks 机密而不是纯文本字符串。 例如:
CREATE CONNECTION '<Connection name>' TYPE salesforce_data_cloud OPTIONS ( client_id secret ('<Secret scope>','<Secret key client id>'), client_secret secret ('<Secret scope>','<Secret key client secret>'), pkce_verifier '<pkce_verifier from the last step>', authorization_code '<URL decoded `authorization_code`, should end with == instead of %3D%3D>', oauth_redirect_uri "https://login.salesforce.com/services/oauth2/success", oauth_scope "cdp_api api cdp_query_api refresh_token offline access", is_sandbox "false" );
有关设置机密的详细信息,请参阅机密管理。
创建外部目录
外部目录镜像外部数据系统中的数据库,以便可以使用 Azure Databricks 和 Unity Catalog 查询和管理对该数据库中数据的访问。 若要创建外部目录,请使用与已定义的数据源的连接。
要创建外部目录,可以使用目录资源管理器,或在 Azure Databricks 笔记本或 SQL 查询编辑器中使用 CREATE FOREIGN CATALOG
SQL 命令。
注意
你还可以使用 Databricks REST API 或 Databricks CLI 来创建目录。 请参阅 POST /api/2.1/unity-catalog/catalogs 和 Unity Catalog 命令。
所需的权限:对元存储具有 CREATE CATALOG
权限,并且具有连接的所有权或对连接具有 CREATE FOREIGN CATALOG
权限。
目录资源管理器
- 在 Azure Databricks 工作区中,单击 “目录”以打开目录资源管理器。
- 单击右上角的“创建目录”。
- 输入 Salesforce Data Cloud 目录的以下属性。
- 目录名称:目录的用户友好名称。
- 类型:
Foreign
。 - 连接名称:将用于创建目录的连接的名称。
- 数据空间:Salesforce 数据空间。
- 单击“创建”。
SQL
在笔记本或 SQL 查询编辑器中运行以下 SQL 命令。 括号中的项是可选的。
CREATE FOREIGN CATALOG [IF NOT EXISTS] '<catalog-name>' USING CONNECTION '<connection-name>'
OPTIONS (dataspace '<dataspace>');
请替换以下值:
<catalog-name>
:<connection-name>
:<dataspace>
:Salesforce 数据空间。 例如default
。
支持的下推
支持以下下推:
- 筛选器
- 投影
- 限制
- 聚合
- Offset
- 强制转换
- Contains、Startswith、Endswith
数据类型映射
从 Salesforce Data Cloud 读取到 Spark 时,数据类型将映射如下:
Salesforce Data Cloud 类型 | Spark 类型 |
---|---|
布尔 | BooleanType |
日期 | DateType |
日期/时间 | TimestampType |
电子邮件、电话、文本、URL | StringType |
数字、百分比 | DecimalType(38, 18) |
限制
- 每个 Databricks 目录仅支持一个 Salesforce 数据空间。