Condividi tramite


Inserire dati da Salesforce

Importante

LakeFlow Connect è in anteprima pubblica controllata. Per partecipare all’anteprima, contattare il team dell’account Databricks.

Questo articolo descrive come inserire dati da Salesforce e caricarli in Azure Databricks usando LakeFlow Connect. La pipeline di inserimento risultante è governata dal Catalogo Unity ed è basata su calcolo serverless e Delta Live Tables.

Il connettore di inserimento Salesforce supporta l'origine seguente:

  • Salesforce Sales Cloud

Operazioni preliminari

Per creare una pipeline di inserimento, è necessario soddisfare i requisiti seguenti:

  • L'area di lavoro è abilitata per Unity Catalog.

  • L'ambiente di calcolo serverless è abilitato per notebook, flussi di lavoro e Delta Live Tables. Consultare la sezione Abilitare l’elaborazione serverless.

  • Per creare una connessione: è presente CREATE CONNECTION nel metastore.

    Per usare una connessione esistente: si dispone USE CONNECTION o ALL PRIVILEGES sull'oggetto connessione.

  • USE CATALOG nel catalogo di destinazione.

  • USE SCHEMA e CREATE TABLE su uno schema esistente oppure CREATE SCHEMA sul catalogo di destinazione.

  • (Scelta consigliata) Creare un utente salesforce che Databricks può usare per recuperare i dati. Assicurarsi che l'utente abbia accesso all'API e accesso a tutti gli oggetti che si prevede di inserire.

Creare una connessione Salesforce

Autorizzazioni necessarie:CREATE CONNECTION nel metastore. Per concedere questa operazione, contattare un amministratore del metastore.

Se si vuole creare una pipeline di inserimento usando una connessione esistente, passare alla sezione seguente. È necessario USE CONNECTION o ALL PRIVILEGES sulla connessione.

Per creare una connessione Salesforce, eseguire le operazioni seguenti:

  1. Nell'area di lavoro di Azure Databricks, clicca su Catalog > Percorsi esterni > Connessioni > Crea connessione.

  2. Per Nome connessione specificare un nome univoco per la connessione Salesforce.

  3. In Tipo di connessione fare clic su Salesforce.

  4. Se si sta eseguendo l'inserimento da un account sandbox di Salesforce, impostare È sandbox su true.

  5. Fare clic su Accedi con Salesforce.

    Account di accesso di Salesforce

  6. Se si sta eseguendo l'inserimento da una sandbox salesforce, fare clic su Usa dominio personalizzato. Specificare l'URL della sandbox e quindi procedere con l'accesso. Databricks consiglia di accedere come utente salesforce dedicato all'inserimento di Databricks.

    Usare il pulsante dominio personalizzato

    Immettere l'URL della sandbox

  7. Dopo aver restituito la pagina Crea connessione , fare clic su Crea.

Creare una pipeline di inserimento

Autorizzazioni necessarie:USE CONNECTION o ALL PRIVILEGES per una connessione.

Questo passaggio descrive come creare la pipeline di inserimento. Ogni tabella inserita corrisponde a una tabella di streaming con lo stesso nome (ma tutte minuscole) nella destinazione per impostazione predefinita, a meno che non venga rinominata in modo esplicito.

Interfaccia utente Databricks

  1. Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Inserimento dati.

  2. Nella pagina Aggiungi dati in Connettori Databricks fare clic su Salesforce.

    Verrà visualizzata la procedura guidata di inserimento di Salesforce.

  3. Nella pagina Pipeline della procedura guidata immettere un nome univoco per la pipeline di inserimento.

  4. Nel menu a tendina del catalogo di destinazione, selezionare un catalogo. I dati acquisiti e i registri degli eventi verranno scritti in questo catalogo.

  5. Selezionare la connessione di Unity Catalog in cui sono archiviate le credenziali necessarie per accedere ai dati di Salesforce.

    Se non sono presenti connessioni Salesforce, fare clic su Crea connessione. È necessario avere il CREATE CONNECTION privilegio per il metastore.

  6. Fare clic su Crea pipeline e continuare.

  7. Nella pagina Origine, selezionare le tabelle di Salesforce da integrare in Databricks e quindi fare clic su Avanti.

    Se si seleziona uno schema, il connettore di inserimento Salesforce scrive tutte le tabelle esistenti e future nello schema di origine nelle tabelle gestite di Unity Catalog.

  8. Nella pagina destinazione selezionare il catalogo e lo schema del catalogo Unity in cui scrivere.

    Se non si vuole usare uno schema esistente, fare clic su Crea schema. È necessario disporre dei privilegi di USE CATALOG e CREATE SCHEMA nel catalogo padre.

  9. Fare clic su Salva pipeline e continuare.

  10. Nella pagina Impostazioni fare clic su Crea pianificazione. Impostare la frequenza per aggiornare le tabelle di destinazione.

  11. Facoltativamente, impostare le notifiche di posta elettronica per l'esito positivo o negativo dell'operazione della pipeline.

  12. Fare clic su Salva ed esegui pipeline.

Bundle di asset di Databricks

Questa scheda descrive come distribuire una pipeline di inserimento usando i bundle di asset di Databricks .This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DAB). I bundle possono contenere definizioni YAML di processi e attività, vengono gestiti tramite l'interfaccia della riga di comando di Databricks e possono essere condivisi ed eseguiti in aree di lavoro di destinazione diverse, ad esempio sviluppo, gestione temporanea e produzione. Per altre informazioni, vedere Aggregazioni di asset di Databricks.

  1. Creare un nuovo bundle usando l'interfaccia della riga di comando di Databricks:

    databricks bundle init
    
  2. Aggiungere due nuovi file di risorse al bundle:

    • Un file di definizione della pipeline (resources/sfdc_pipeline.yml).
    • File del flusso di lavoro che controlla la frequenza di inserimento dati (resources/sfdc_job.yml).

    Di seguito è riportato un file resources/sfdc_pipeline.yml di esempio:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    Di seguito è riportato un file resources/sfdc_job.yml di esempio:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Distribuire la pipeline usando l'interfaccia della riga di comando di Databricks:

    databricks bundle deploy
    

Databricks CLI

Per creare la pipeline

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Per aggiornare la pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

Per ottenere la definizione della pipeline:

databricks pipelines get "<pipeline-id>"

Per eliminare la pipeline:

databricks pipelines delete "<pipeline-id>"

Per altre informazioni, è possibile eseguire:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Avviare, pianificare e impostare avvisi per la tua pipeline

  1. Dopo aver creato la pipeline, tornare all'area di lavoro di Databricks e quindi fare clic su Delta Live Tables.

    La nuova pipeline viene visualizzata nell'elenco delle pipeline.

  2. Per visualizzare i dettagli della pipeline, fare clic sul nome della pipeline.

  3. Nella pagina dei dettagli della pipeline eseguire la pipeline facendo clic su Avvia. È possibile pianificare la pipeline facendo clic su Pianifica.

  4. Per impostare gli avvisi nella pipeline, fare clic su Schedule, fare clic su Altre opzionie quindi aggiungere una notifica.

  5. Al termine dell'inserimento, puoi interrogare le tue tabelle.

Nota

Quando la pipeline viene eseguita, è possibile che vengano visualizzate due viste di origine per una determinata tabella. Una visualizzazione contiene gli snapshot per i campi della formula. L'altra vista contiene i pull incrementali dei dati per i campi non formula. Queste viste vengono unite nella tabella di destinazione.