从 Salesforce 引入数据
重要
LakeFlow Connect 目前以封闭的公共预览版提供。 若要参与预览,请联系你的 Databricks 帐户团队。
本文介绍了如何使用 LakeFlow Connect 从 Salesforce 引入数据并将其加载到 Azure Databricks 中。 最终的引入管道由 Unity Catalog 管理,并由无服务器计算和增量实时表提供支持。
Salesforce 引入连接器支持以下源:
- Salesforce Sales Cloud
开始之前
若要创建引入管道,必须满足以下要求:
你的工作区启用了 Unity Catalog。
笔记本、工作流和增量实时表启用了无服务器计算。 请参阅启用无服务器计算。
若要创建连接:需要拥有对元存储的
CREATE CONNECTION
权限。若要使用现有的连接:需要拥有连接对象的
USE CONNECTION
或ALL PRIVILEGES
权限。目标目录的
USE CATALOG
权限。现有架构的
USE SCHEMA
和CREATE TABLE
权限,或目标目录的CREATE SCHEMA
权限。(推荐)创建 Databricks 可用于检索数据的 Salesforce 用户。 确保用户具有 API 访问权限,并有权访问你计划引入的所有对象。
创建 Salesforce 连接
所需的权限:元存储上的 CREATE CONNECTION
。 请联系元存储管理员来授予此权限。
如果要使用现有连接创建引入管道,请跳到以下部分。 需要 USE CONNECTION
或 ALL PRIVILEGES
连接。
若要创建 Salesforce 连接,请执行以下操作:
在 Azure Databricks 工作区中,单击“目录”>“外部位置”>“连接”>“创建连接”。
对于“连接名称”,请为 Salesforce 连接指定一个唯一名称。
对于“连接类型”,请单击“Salesforce”。
如果要从 Salesforce 沙盒帐户引入,请将“是沙盒”设置为
true
。单击“使用 Salesforce 登录”。
如果要从 Salesforce 沙盒引入,请单击“ 使用自定义域”。 提供沙盒 URL,然后继续登录。 Databricks 建议以专用于 Databricks 引入的 Salesforce 用户身份登录。
返回到“创建连接”页后,单击“创建”。
创建引入管道
所需的权限:USE CONNECTION
或ALL PRIVILEGES
连接。
此步骤介绍了如何创建引入管道。 默认情况下,每个引入的表都对应于目标中的一个同名(但全部小写)的流式处理表,除非你明确重命名它。
Databricks UI
在 Azure Databricks 工作区的边栏中,单击 “数据引入”。
在“添加数据”页上的“Databricks 连接器”下,单击“Salesforce”。
Salesforce 引入向导随即打开。
在 向导的“管道 ”页上,输入引入管道的唯一名称。
在 “目标目录 ”下拉列表中,选择一个目录。 引入的数据和事件日志将写入此目录。
选择存储访问 Salesforce 数据所需的凭据的 Unity 目录连接。
如果没有 Salesforce 连接,请单击“ 创建连接”。 你必须对元存储具有
CREATE CONNECTION
特权。单击“ 创建管道”并继续。
在 “源 ”页上,选择要引入 Databricks 的 Salesforce 表,然后单击“ 下一步”。
如果选择架构,Salesforce 引入连接器会将源架构中的所有现有表和将来表写入 Unity 目录托管表。
在 “目标 ”页上,选择要写入的 Unity 目录和架构。
如果不想使用现有架构,请单击“ 创建架构”。 你必须对父目录拥有和
USE CATALOG
CREATE SCHEMA
特权。单击“ 保存管道”并继续。
在 “设置” 页上,单击“ 创建计划”。 设置刷新目标表的频率。
(可选)为管道操作成功或失败设置电子邮件通知。
单击“ 保存”并运行管道。
Databricks 资产捆绑包
此选项卡介绍如何使用 Databricks 资产捆绑包(DAB)部署引入管道。 捆绑包可以包含作业和任务的 YAML 定义,使用 Databricks CLI 进行管理,并且可以在不同的目标工作区(如开发、过渡和生产工作区)中共享和运行。 有关详细信息,请参阅 Databricks 资产捆绑包。
使用 Databricks CLI 创建新捆绑包:
databricks bundle init
将两个新资源文件添加到捆绑包:
- 管道定义文件 (
resources/sfdc_pipeline.yml
)。 - 控制数据引入频率的工作流文件(
resources/sfdc_job.yml
)。
以下是一个示例
resources/sfdc_pipeline.yml
文件:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} channel: "preview"
以下是一个示例
resources/sfdc_job.yml
文件:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- 管道定义文件 (
使用 Databricks CLI 部署管道:
databricks bundle deploy
Databricks CLI
若要创建管道
databricks pipelines create --json "<pipeline-definition | json-file-path>"
更新管道:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
获取管道定义:
databricks pipelines get "<pipeline-id>"
删除管道:
databricks pipelines delete "<pipeline-id>"
若要获得详细信息,可以运行:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
在管道上启动、计划和设置警报
创建管道后,重新访问 Databricks 工作区,然后单击“Delta Live Tables”。
新管道将显示在管道列表中。
若要查看管道详细信息,请单击管道名称。
在管道详细信息页上,单击“开始”运行管道。 你还可以通过单击“计划”来计划管道。
若要在管道上设置警报,请依次单击“计划”、“更多选项”,然后添加通知。
引入完成后,可以查询表。
注意
管道运行时,你可能会看到给定表的两个源视图。 一个视图包含公式字段的快照。 另一个视图包含非公式字段的增量数据拉取。 这些视图在目标表中联接。