Прием данных из Salesforce
Внимание
LakeFlow Connect находится в закрытой общедоступной предварительной версии. Чтобы принять участие в предварительной версии, обратитесь к группе учетной записи Databricks.
В этой статье описывается прием данных из Salesforce и его загрузка в Azure Databricks с помощью LakeFlow Connect. Результирующий конвейер приема управляется Unity Catalog с использованием бессерверных вычислительных ресурсов и Delta Live Tables.
Соединитель приема Salesforce поддерживает следующий источник:
- Salesforce Sales Cloud
Подготовка к работе
Чтобы создать конвейер приема, необходимо выполнить следующие требования:
Ваша рабочая область активирована для Unity Catalog.
Бессерверные вычисления доступны для записных книжек, рабочих процессов и Delta Live Tables. См. раздел "Включить бессерверные вычисления".
Создание подключения: у вас есть
CREATE CONNECTION
в хранилище метаданных.Для использования существующего подключения: у
USE CONNECTION
вас есть илиALL PRIVILEGES
объект подключения.USE CATALOG
в целевом каталоге.USE SCHEMA
иCREATE TABLE
на существующей схеме илиCREATE SCHEMA
на целевом каталоге.(Рекомендуется) Создайте пользователя Salesforce, который Databricks может использовать для извлечения данных. Убедитесь, что у пользователя есть доступ к API и доступ ко всем объектам, которые вы планируете принимать.
Создание подключения Salesforce
Необходимые разрешения:CREATE CONNECTION
в хранилище метаданных. Чтобы предоставить это, обратитесь к администратору хранилища метаданных.
Если вы хотите создать конвейер приема с помощью существующего подключения, перейдите к следующему разделу. Вам нужно USE CONNECTION
или ALL PRIVILEGES
подключиться.
Чтобы создать подключение Salesforce, сделайте следующее:
В рабочей области Azure Databricks щелкните Каталог > Внешние расположения > Подключения > Создать подключение.
Для имени подключения укажите уникальное имя подключения Salesforce.
Для типа подключения щелкните Salesforce.
Если вы получаете данные из учетной записи песочницы Salesforce, установите для параметр "Является песочницей" на
true
.Нажмите кнопку "Войти" с помощью Salesforce.
Если вы используете песочницу Salesforce, нажмите кнопку "Использовать личный домен". Укажите URL-адрес песочницы и перейдите к входу. Databricks рекомендует войти в систему как пользователь Salesforce, предназначенный для приема Databricks.
После возвращения на страницу "Создать подключение " нажмите кнопку "Создать".
Создание конвейера приема
Необходимые разрешения:USE CONNECTION
или ALL PRIVILEGES
подключение.
На этом шаге описывается создание конвейера приема. Каждая загруженная таблица по умолчанию соответствует таблице потоковой передачи с тем же именем, но написанная строчными буквами, в качестве пункта назначения, если вы явно не переименуете ее.
Пользовательский интерфейс Databricks
На боковой панели рабочей области Azure Databricks щелкните "Прием данных".
На странице "Добавление данных" в разделе Соединители Databricks щелкните Salesforce.
Откроется мастер приема Salesforce.
На странице конвейера мастера введите уникальное имя конвейера приема.
В раскрывающемся списке каталога
назначения выберите каталог. Полученные данные и журналы событий будут записаны в этот каталог. Выберите подключение каталога Unity, которое хранит учетные данные, необходимые для доступа к данным Salesforce.
Если подключений Salesforce нет, щелкните Создать подключение. У вас должна быть привилегия
CREATE CONNECTION
в хранилище метаданных.Нажмите кнопку "Создать конвейер" и продолжить.
На странице источника выберите таблицы Salesforce для загрузки в Databricks, а затем нажмите кнопку Далее.
При выборе схемы соединитель загрузки Salesforce записывает все существующие и будущие таблицы из исходной схемы в управляемые таблицы каталога Unity.
На странице назначения
выберите каталог каталога Unity и схему для записи. Если вы не хотите использовать существующую схему, щелкните Создать схему. Для доступа к родительскому каталогу у вас должны быть права
USE CATALOG
иCREATE SCHEMA
.Нажмите кнопку "Сохранить конвейер" и продолжить.
На странице "Параметры" нажмите кнопку "Создать расписание". Задайте частоту обновления целевых таблиц.
При необходимости задайте уведомления по электронной почте для успешного выполнения или сбоя операции конвейера.
Нажмите кнопку "Сохранить и запустить конвейер".
Пакеты активов Databricks
На этой вкладке описывается развертывание конвейера приема с помощью пакетов ресурсов Databricks (DAB). Пакеты могут содержать определения заданий и задач YAML, управляются с помощью интерфейса командной строки Databricks и могут использоваться совместно и выполняться в разных целевых рабочих областях (таких как разработка, промежуточное создание и производство). Дополнительные сведения см. в разделе "Пакеты ресурсов Databricks".
Создайте новый пакет с помощью интерфейса командной строки Databricks:
databricks bundle init
Добавьте два новых файла ресурсов в пакет:
- Файл определения конвейера (
resources/sfdc_pipeline.yml
). - Файл рабочего процесса, который управляет частотой приема данных (
resources/sfdc_job.yml
).
Ниже представлен пример файла
resources/sfdc_pipeline.yml
.variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} channel: "preview"
Ниже представлен пример файла
resources/sfdc_job.yml
.resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Файл определения конвейера (
Разверните конвейер с помощью интерфейса командной строки Databricks:
databricks bundle deploy
Интерфейс командной строки Databricks
Чтобы создать конвейер
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Чтобы обновить конвейер, выполните следующие действия.
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
Чтобы получить определение конвейера, выполните следующие действия.
databricks pipelines get "<pipeline-id>"
Чтобы удалить конвейер, выполните следующие действия.
databricks pipelines delete "<pipeline-id>"
Дополнительные сведения см. в следующем:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Запуск, планирование и настройка оповещений в конвейере
После создания конвейера вернитесь в рабочую область Databricks и нажмите на Delta Live Tables.
Новый конвейер появится в списке конвейеров.
Чтобы просмотреть сведения о конвейере, щелкните имя конвейера.
На странице сведений о конвейере запустите конвейер, нажав кнопку "Пуск". Вы можете запланировать конвейер, нажав кнопку "Расписание".
Чтобы задать оповещения в конвейере, щелкните Расписание, щелкните Дополнительные параметры, а затем добавьте уведомление.
После завершения приема можно запросить таблицы.
Примечание.
При запуске конвейера может появиться два исходных представления для заданной таблицы. Одно представление содержит моментальные снимки полей формул. В другом представлении содержатся добавочные данные, извлекаемые для полей, отличных от формул. Эти представления объединяются в целевую таблицу.