从 Salesforce 引入数据

重要

LakeFlow Connect 目前以封闭的公共预览版提供。 若要参与预览,请联系你的 Databricks 帐户团队。

本文介绍了如何使用 LakeFlow Connect 从 Salesforce 引入数据并将其加载到 Azure Databricks 中。 最终的引入管道由 Unity Catalog 管理,并由无服务器计算和增量实时表提供支持。

Salesforce 引入连接器支持以下源:

  • Salesforce Sales Cloud

开始之前

若要创建引入管道,必须满足以下要求:

  • 你的工作区启用了 Unity Catalog。

  • 笔记本、工作流和增量实时表启用了无服务器计算。 请参阅启用无服务器计算

  • 若要创建连接:需要拥有对元存储的 CREATE CONNECTION 权限。

    若要使用现有的连接:需要拥有连接对象的 USE CONNECTIONALL PRIVILEGES 权限。

  • 目标目录的 USE CATALOG 权限。

  • 现有架构的 USE SCHEMACREATE TABLE 权限,或目标目录的 CREATE SCHEMA 权限。

  • (推荐)创建 Databricks 可用于检索数据的 Salesforce 用户。 确保用户具有 API 访问权限,并有权访问你计划引入的所有对象。

创建 Salesforce 连接

所需的权限:元存储上的 CREATE CONNECTION 请联系元存储管理员来授予此权限。

如果要使用现有连接创建引入管道,请跳到以下部分。 需要 USE CONNECTIONALL PRIVILEGES 连接。

若要创建 Salesforce 连接,请执行以下操作:

  1. 在 Azure Databricks 工作区中,单击“目录”>“外部位置”>“连接”>“创建连接”。

  2. 对于“连接名称”,请为 Salesforce 连接指定一个唯一名称。

  3. 对于“连接类型”,请单击“Salesforce”

  4. 如果要从 Salesforce 沙盒帐户引入,请将“是沙盒”设置为 true

  5. 单击“使用 Salesforce 登录”

    Salesforce 登录

  6. 如果要从 Salesforce 沙盒引入,请单击“ 使用自定义域”。 提供沙盒 URL,然后继续登录。 Databricks 建议以专用于 Databricks 引入的 Salesforce 用户身份登录。

    “使用自定义域”按钮

    输入沙盒 URL

  7. 返回到“创建连接”页后,单击“创建”。

创建引入管道

所需的权限:USE CONNECTIONALL PRIVILEGES连接。

此步骤介绍了如何创建引入管道。 默认情况下,每个引入的表都对应于目标中的一个同名(但全部小写)的流式处理表,除非你明确重命名它。

Databricks UI

  1. 在 Azure Databricks 工作区的边栏中,单击 “数据引入”。

  2. “添加数据”页上的“Databricks 连接器”下,单击“Salesforce”。

    Salesforce 引入向导随即打开。

  3. 向导的“管道 ”页上,输入引入管道的唯一名称。

  4. “目标目录 ”下拉列表中,选择一个目录。 引入的数据和事件日志将写入此目录。

  5. 选择存储访问 Salesforce 数据所需的凭据的 Unity 目录连接。

    如果没有 Salesforce 连接,请单击“ 创建连接”。 你必须对元存储具有 CREATE CONNECTION 特权。

  6. 单击“ 创建管道”并继续

  7. “源 ”页上,选择要引入 Databricks 的 Salesforce 表,然后单击“ 下一步”。

    如果选择架构,Salesforce 引入连接器会将源架构中的所有现有表和将来表写入 Unity 目录托管表。

  8. “目标 ”页上,选择要写入的 Unity 目录和架构。

    如果不想使用现有架构,请单击“ 创建架构”。 你必须对父目录拥有和USE CATALOGCREATE SCHEMA特权。

  9. 单击“ 保存管道”并继续

  10. “设置” 页上,单击“ 创建计划”。 设置刷新目标表的频率。

  11. (可选)为管道操作成功或失败设置电子邮件通知。

  12. 单击“ 保存”并运行管道

Databricks 资产捆绑包

此选项卡介绍如何使用 Databricks 资产捆绑包(DAB)部署引入管道。 捆绑包可以包含作业和任务的 YAML 定义,使用 Databricks CLI 进行管理,并且可以在不同的目标工作区(如开发、过渡和生产工作区)中共享和运行。 有关详细信息,请参阅 Databricks 资产捆绑包

  1. 使用 Databricks CLI 创建新捆绑包:

    databricks bundle init
    
  2. 将两个新资源文件添加到捆绑包:

    • 管道定义文件 (resources/sfdc_pipeline.yml)。
    • 控制数据引入频率的工作流文件(resources/sfdc_job.yml)。

    以下是一个示例 resources/sfdc_pipeline.yml 文件:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    以下是一个示例 resources/sfdc_job.yml 文件:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. 使用 Databricks CLI 部署管道:

    databricks bundle deploy
    

Databricks CLI

若要创建管道

databricks pipelines create --json "<pipeline-definition | json-file-path>"

更新管道:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

获取管道定义:

databricks pipelines get "<pipeline-id>"

删除管道:

databricks pipelines delete "<pipeline-id>"

若要获得详细信息,可以运行:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

在管道上启动、计划和设置警报

  1. 创建管道后,重新访问 Databricks 工作区,然后单击“Delta Live Tables”。

    新管道将显示在管道列表中。

  2. 若要查看管道详细信息,请单击管道名称。

  3. 在管道详细信息页上,单击“开始”运行管道。 你还可以通过单击“计划”来计划管道。

  4. 若要在管道上设置警报,请依次单击“计划”、“更多选项”,然后添加通知

  5. 引入完成后,可以查询表。

注意

管道运行时,你可能会看到给定表的两个源视图。 一个视图包含公式字段的快照。 另一个视图包含非公式字段的增量数据拉取。 这些视图在目标表中联接。