Compartir a través de


Ingesta de datos desde Salesforce

Importante

LakeFlow Connect está en versión preliminar pública. Para participar en la versión preliminar, póngase en contacto con el equipo de cuentas de Databricks.

En este artículo se describe cómo ingerir datos desde Salesforce y cargarlos en Azure Databricks mediante LakeFlow Connect. La canalización de ingesta resultante se rige mediante Unity Catalog y cuenta con tecnología de proceso sin servidor y Delta Live Tables.

El conector de ingesta de Salesforce admite el siguiente origen:

  • Salesforce Sales Cloud

Antes de empezar

Para crear una canalización de ingesta, debe cumplir los siguientes requisitos:

  • El área de trabajo está habilitada para Unity Catalog.

  • El proceso sin servidor está habilitado para cuadernos, flujos de trabajo y Delta Live Tables. Consulte Habilitación del proceso sin servidor.

  • Para crear una conexión: tiene CREATE CONNECTION en el metastore.

    Para usar una conexión existente: tiene USE CONNECTION o ALL PRIVILEGES en el objeto de conexión.

  • USE CATALOG en el catálogo de destino.

  • USE SCHEMA y CREATE TABLE en un esquema o CREATE SCHEMA existente en el catálogo de destino.

  • (Recomendado) Cree un usuario de Salesforce que Databricks pueda usar para recuperar datos. Asegúrese de que el usuario tiene acceso a la API y a todos los objetos que planea ingerir.

Creación de una conexión de Salesforce

Permisos obligatorios:CREATE CONNECTION en el metastore. Póngase en contacto con un administrador del metastore para que se lo conceda.

Si desea crear una canalización de ingesta mediante una conexión existente, vaya a la sección siguiente. USE CONNECTION Necesita o ALL PRIVILEGES en la conexión.

Para crear una conexión de Salesforce, siga estos pasos:

  1. En el área de trabajo de Azure Databricks, haga clic en Catálogo > Ubicaciones externas > Conexiones > Crear conexión.

  2. En Nombre de conexión, especifique un nombre único para la conexión de Salesforce.

  3. En Tipo de conexión, haga clic en Salesforce.

  4. Si va a realizar la ingesta desde una cuenta de espacio aislado de Salesforce, establezca Es espacio aislado en true.

  5. Haga clic en Iniciar sesión con Salesforce.

    Inicio de sesión de Salesforce

  6. Si va a ingerir desde un espacio aislado de Salesforce, haga clic en Usar dominio personalizado. Proporcione la dirección URL del espacio aislado y, a continuación, continúe con el inicio de sesión. Databricks recomienda iniciar sesión como usuario de Salesforce dedicado a la ingesta de Databricks.

    Botón Usar dominio personalizado

    Escriba la dirección URL del espacio aislado

  7. Después de volver a la página Crear conexión, haga clic en Crear.

Creación de una canalización de ingesta

Permisos necesarios:USE CONNECTION o ALL PRIVILEGES en una conexión.

En este paso se describe cómo crear la canalización de ingesta. Cada tabla que se ingiere se corresponde a una tabla de streaming con el mismo nombre (pero en minúsculas) en el destino de manera predeterminada, a menos que le cambie el nombre explícitamente.

Interfaz de usuario de Databricks

  1. En la barra lateral del área de trabajo de Azure Databricks, haga clic en Ingesta de datos.

  2. En la página Agregar datos , en Conectores de Databricks, haga clic en Salesforce.

    Se abre el Asistente para ingesta de Salesforce.

  3. En la página Canalización del asistente, escriba un nombre único para la canalización de ingesta.

  4. En la lista desplegable Catálogo de destino, seleccione un catálogo. Los datos ingeridos y los registros de eventos se escribirán en este catálogo.

  5. Seleccione la conexión de Catálogo de Unity que almacena las credenciales necesarias para acceder a los datos de Salesforce.

    Si no hay conexiones de Salesforce, haga clic en Crear conexión. Debe tener el CREATE CONNECTION privilegio en el metastore.

  6. Haga clic en Crear canalización y continúe.

  7. En la página Origen , seleccione las tablas de Salesforce para ingerir en Databricks y, a continuación, haga clic en Siguiente.

    Si selecciona un esquema, el conector de ingesta de Salesforce escribe todas las tablas existentes y futuras en el esquema de origen en tablas administradas de Unity Catalog.

  8. En la página Destino , seleccione el catálogo de Unity Catalog y el esquema en los que escribir.

    Si no desea usar un esquema existente, haga clic en Crear esquema. Debe tener los USE CATALOG privilegios y CREATE SCHEMA en el catálogo primario.

  9. Haga clic en Guardar canalización y continúe.

  10. En la página Configuración , haga clic en Crear programación. Establezca la frecuencia para actualizar las tablas de destino.

  11. Opcionalmente, establezca notificaciones por correo electrónico para que la operación de canalización se complete correctamente o no.

  12. Haga clic en Guardar y ejecutar canalización.

Conjuntos de recursos de Databricks

En esta pestaña se describe cómo implementar una canalización de ingesta mediante conjuntos de recursos (DAB) de Databricks. Las agrupaciones pueden contener definiciones de YAML de trabajos y tareas, se administran mediante la CLI de Databricks y se pueden compartir y ejecutar en diferentes áreas de trabajo de destino (como desarrollo, almacenamiento provisional y producción). Para más información, consulte Conjuntos de recursos de Databricks.

  1. Cree una nueva agrupación mediante la CLI de Databricks:

    databricks bundle init
    
  2. Agregue dos nuevos archivos de recursos al lote:

    • Un archivo de definición de canalización (resources/sfdc_pipeline.yml).
    • Un archivo de flujo de trabajo que controla la frecuencia de ingesta de datos (resources/sfdc_job.yml).

    A continuación se muestra un archivo resources/sfdc_pipeline.yml de ejemplo:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: 'preview'
    

    A continuación se muestra un archivo resources/sfdc_job.yml de ejemplo:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Implemente la canalización mediante la CLI de Databricks:

    databricks bundle deploy
    

CLI de Databricks

Para crear la canalización:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Para actualiza la canalización:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

Para obtener la definición de la canalización:

databricks pipelines get "<pipeline-id>"

Para eliminar la canalización:

databricks pipelines delete "<pipeline-id>"

Para más información, puede ejecutar lo siguiente:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Inicio, programación y establecimiento de alertas en la canalización

  1. Una vez que se ha creado la canalización, vuelva a visitar el área de trabajo de Databricks y, después, haga clic en Delta Live Tables.

    La nueva canalización aparece en la lista de canalizaciones.

  2. Para ver los detalles de la canalización, haga clic en el nombre de la canalización.

  3. En la página de detalles de la canalización, haga clic en Iniciar para ejecutar la canalización. Puede programar la canalización si hace clic en Programar.

  4. Para establecer alertas en la canalización, haga clic en Programar, haga clic en Más opciones y, después, agregue una notificación.

  5. Una vez que se completa la ingesta, puede consultar las tablas.

Nota:

Cuando se ejecuta la canalización, es posible que vea dos vistas de origen para una tabla determinada. Una vista contiene las instantáneas de los campos de fórmula. La otra vista contiene las extracciones de datos incrementales para los campos que no son de fórmula. Estas vistas se combinan en la tabla de destino.