Ingest data from Salesforce
Important
LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.
This article describes how to ingest data from Salesforce and load it into Azure Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.
The Salesforce ingestion connector supports the following source:
- Salesforce Sales Cloud
Before you begin
To create an ingestion pipeline, you must meet the following requirements:
Your workspace is enabled for Unity Catalog.
Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.
To create a connection: You have
CREATE CONNECTION
on the metastore.To use an existing connection: You have
USE CONNECTION
orALL PRIVILEGES
on the connection object.USE CATALOG
on the target catalog.USE SCHEMA
andCREATE TABLE
on an existing schema orCREATE SCHEMA
on the target catalog.(Recommended) Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.
Create a Salesforce connection
Permissions required: CREATE CONNECTION
on the metastore. Contact a metastore admin to grant this.
If you want to create an ingestion pipeline using an existing connection, skip to the following section. You need USE CONNECTION
or ALL PRIVILEGES
on the connection.
To create a Salesforce connection, do the following:
In the Azure Databricks workspace, click Catalog > External locations > Connections > Create connection.
For Connection name, specify a unique name for the Salesforce connection.
For Connection type, click Salesforce.
Set Auth type to OAuth.
If you’re ingesting from a Salesforce sandbox account, set Is sandbox to
true
.Click Log in with Salesforce.
If you’re ingesting from a Salesforce sandbox, click Use Custom Domain. Provide the sandbox URL, and then proceed to login. Databricks recommends logging in as a Salesforce user that’s dedicated to Databricks ingestion.
After returning to the Create Connection page, click Create.
Create an ingestion pipeline
Permissions required: USE CONNECTION
or ALL PRIVILEGES
on a connection.
This step describes how to create the ingestion pipeline. Each ingested table corresponds to a streaming table with the same name (but all lowercase) in the destination by default, unless you explicitly rename it.
Databricks UI
In the sidebar of the Azure Databricks workspace, click Data Ingestion.
On the Add data page, under Databricks connectors, click Salesforce.
The Salesforce ingestion wizard opens.
On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.
In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.
Select the Unity Catalog connection that stores the credentials required to access Salesforce data.
If there are no Salesforce connections, click Create connection. You must have the
CREATE CONNECTION
privilege on the metastore.Click Create pipeline and continue.
On the Source page, select the Salesforce tables to ingest into Databricks, and then click Next.
If you select a schema, the Salesforce ingestion connector writes all existing and future tables in the source schema to Unity Catalog managed tables.
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don’t want to use an existing schema, click Create schema. You must have the
USE CATALOG
andCREATE SCHEMA
privileges on the parent catalog.Click Save pipeline and continue.
On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.
Optionally, set email notifications for pipeline operation success or failure.
Click Save and run pipeline.
Databricks Asset Bundles
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DABs). Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
Create a new bundle using the Databricks CLI:
databricks bundle init
Add two new resource files to the bundle:
- A pipeline definition file (
resources/sfdc_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/sfdc_job.yml
).
The following is an example
resources/sfdc_pipeline.yml
file:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} channel: "preview"
The following is an example
resources/sfdc_job.yml
file:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- A pipeline definition file (
Deploy the pipeline using the Databricks CLI:
databricks bundle deploy
Databricks CLI
To create the pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
To update the pipeline:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
To get the pipeline definition:
databricks pipelines get "<pipeline-id>"
To delete the pipeline:
databricks pipelines delete "<pipeline-id>"
For more information, you can run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Start, schedule, and set alerts on your pipeline
After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.
The new pipeline appears in the pipeline list.
To view the pipeline details, click the pipeline name.
On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.
To set alerts on the pipeline, click Schedule, click More options, and then add a notification.
After ingestion completes, you can query your tables.
Note
When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.