Поделиться через


Прием данных из Salesforce

Внимание

LakeFlow Connect находится в закрытой общедоступной предварительной версии. Чтобы принять участие в предварительной версии, обратитесь к группе учетной записи Databricks.

В этой статье описывается прием данных из Salesforce и его загрузка в Azure Databricks с помощью LakeFlow Connect. Результирующий конвейер приема управляется Unity Catalog с использованием бессерверных вычислительных ресурсов и Delta Live Tables.

Соединитель приема Salesforce поддерживает следующий источник:

  • Salesforce Sales Cloud

Подготовка к работе

Чтобы создать конвейер приема, необходимо выполнить следующие требования:

  • Ваша рабочая область активирована для Unity Catalog.

  • Бессерверные вычисления доступны для записных книжек, рабочих процессов и Delta Live Tables. См. раздел "Включить бессерверные вычисления".

  • Создание подключения: у вас есть CREATE CONNECTION в хранилище метаданных.

    Для использования существующего подключения: у USE CONNECTION вас есть или ALL PRIVILEGES объект подключения.

  • USE CATALOG в целевом каталоге.

  • USE SCHEMA и CREATE TABLE на существующей схеме или CREATE SCHEMA на целевом каталоге.

  • (Рекомендуется) Создайте пользователя Salesforce, который Databricks может использовать для извлечения данных. Убедитесь, что у пользователя есть доступ к API и доступ ко всем объектам, которые вы планируете принимать.

Создание подключения Salesforce

Необходимые разрешения:CREATE CONNECTION в хранилище метаданных. Чтобы предоставить это, обратитесь к администратору хранилища метаданных.

Если вы хотите создать конвейер приема с помощью существующего подключения, перейдите к следующему разделу. Вам нужно USE CONNECTION или ALL PRIVILEGES подключиться.

Чтобы создать подключение Salesforce, сделайте следующее:

  1. В рабочей области Azure Databricks щелкните Каталог > Внешние расположения > Подключения > Создать подключение.

  2. Для имени подключения укажите уникальное имя подключения Salesforce.

  3. Для типа подключения щелкните Salesforce.

  4. Если вы получаете данные из учетной записи песочницы Salesforce, установите для параметр "Является песочницей" на true.

  5. Нажмите кнопку "Войти" с помощью Salesforce.

    Имя входа Salesforce

  6. Если вы используете песочницу Salesforce, нажмите кнопку "Использовать личный домен". Укажите URL-адрес песочницы и перейдите к входу. Databricks рекомендует войти в систему как пользователь Salesforce, предназначенный для приема Databricks.

    Кнопка

    Введите URL-адрес песочницы

  7. После возвращения на страницу "Создать подключение " нажмите кнопку "Создать".

Создание конвейера приема

Необходимые разрешения:USE CONNECTION или ALL PRIVILEGES подключение.

На этом шаге описывается создание конвейера приема. Каждая загруженная таблица по умолчанию соответствует таблице потоковой передачи с тем же именем, но написанная строчными буквами, в качестве пункта назначения, если вы явно не переименуете ее.

Пользовательский интерфейс Databricks

  1. На боковой панели рабочей области Azure Databricks щелкните "Прием данных".

  2. На странице "Добавление данных" в разделе Соединители Databricks щелкните Salesforce.

    Откроется мастер приема Salesforce.

  3. На странице конвейера мастера введите уникальное имя конвейера приема.

  4. В раскрывающемся списке каталога назначения выберите каталог. Полученные данные и журналы событий будут записаны в этот каталог.

  5. Выберите подключение каталога Unity, которое хранит учетные данные, необходимые для доступа к данным Salesforce.

    Если подключений Salesforce нет, щелкните Создать подключение. У вас должна быть привилегия CREATE CONNECTION в хранилище метаданных.

  6. Нажмите кнопку "Создать конвейер" и продолжить.

  7. На странице источника выберите таблицы Salesforce для загрузки в Databricks, а затем нажмите кнопку Далее.

    При выборе схемы соединитель загрузки Salesforce записывает все существующие и будущие таблицы из исходной схемы в управляемые таблицы каталога Unity.

  8. На странице назначения выберите каталог каталога Unity и схему для записи.

    Если вы не хотите использовать существующую схему, щелкните Создать схему. Для доступа к родительскому каталогу у вас должны быть права USE CATALOG и CREATE SCHEMA.

  9. Нажмите кнопку "Сохранить конвейер" и продолжить.

  10. На странице "Параметры" нажмите кнопку "Создать расписание". Задайте частоту обновления целевых таблиц.

  11. При необходимости задайте уведомления по электронной почте для успешного выполнения или сбоя операции конвейера.

  12. Нажмите кнопку "Сохранить и запустить конвейер".

Пакеты активов Databricks

На этой вкладке описывается развертывание конвейера приема с помощью пакетов ресурсов Databricks (DAB). Пакеты могут содержать определения заданий и задач YAML, управляются с помощью интерфейса командной строки Databricks и могут использоваться совместно и выполняться в разных целевых рабочих областях (таких как разработка, промежуточное создание и производство). Дополнительные сведения см. в разделе "Пакеты ресурсов Databricks".

  1. Создайте новый пакет с помощью интерфейса командной строки Databricks:

    databricks bundle init
    
  2. Добавьте два новых файла ресурсов в пакет:

    • Файл определения конвейера (resources/sfdc_pipeline.yml).
    • Файл рабочего процесса, который управляет частотой приема данных (resources/sfdc_job.yml).

    Ниже представлен пример файла resources/sfdc_pipeline.yml.

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    Ниже представлен пример файла resources/sfdc_job.yml.

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Разверните конвейер с помощью интерфейса командной строки Databricks:

    databricks bundle deploy
    

Интерфейс командной строки Databricks

Чтобы создать конвейер

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Чтобы обновить конвейер, выполните следующие действия.

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

Чтобы получить определение конвейера, выполните следующие действия.

databricks pipelines get "<pipeline-id>"

Чтобы удалить конвейер, выполните следующие действия.

databricks pipelines delete "<pipeline-id>"

Дополнительные сведения см. в следующем:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Запуск, планирование и настройка оповещений в конвейере

  1. После создания конвейера вернитесь в рабочую область Databricks и нажмите на Delta Live Tables.

    Новый конвейер появится в списке конвейеров.

  2. Чтобы просмотреть сведения о конвейере, щелкните имя конвейера.

  3. На странице сведений о конвейере запустите конвейер, нажав кнопку "Пуск". Вы можете запланировать конвейер, нажав кнопку "Расписание".

  4. Чтобы задать оповещения в конвейере, щелкните Расписание, щелкните Дополнительные параметры, а затем добавьте уведомление.

  5. После завершения приема можно запросить таблицы.

Примечание.

При запуске конвейера может появиться два исходных представления для заданной таблицы. Одно представление содержит моментальные снимки полей формул. В другом представлении содержатся добавочные данные, извлекаемые для полей, отличных от формул. Эти представления объединяются в целевую таблицу.