次の方法で共有


Salesforce からデータを取り込む

重要

LakeFlow Connect は限定的なパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使って Salesforce からデータを取り込み、それを Azure Databricks に読み込む方法について説明します。 最終的なインジェスト パイプラインは Unity Catalog によって管理され、サーバーレス コンピューティングと Delta Live Tables を利用します。

Salesforce インジェスト コネクタでは、次のソースがサポートされています:

  • Salesforce Sales クラウド

開始する前に

インジェスト パイプラインを作成するには、以下の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっている。

  • サーバーレス コンピューティングがノートブック、ワークフロー、Delta Live Tables に対して有効になっている。 「サーバーレス コンピューティングを有効にする」をご覧ください。

  • 接続を作成するには: メタストアに対する CREATE CONNECTION があります。

    既存の接続を使用するには: 接続オブジェクトに対する USE CONNECTION または ALL PRIVILEGES があります。

  • ターゲット カタログに対する USE CATALOG

  • 既存のスキーマに対する USE SCHEMA および CREATE TABLE、またはターゲット カタログに対する CREATE SCHEMA

  • (推奨)Databricks がデータの取得に使用できる Salesforce ユーザーを作成します。 そのユーザーには、API アクセス権と、取り込む予定のすべてのオブジェクトに対するアクセス権を持たせます。

Salesforce 接続を作成する

必要なアクセス許可: メタストアに対する CREATE CONNECTION。 これを許可するには、メタストア管理者に問い合わせてください。

既存の接続を使用してインジェスト パイプラインを作成する場合は、次のセクションに進んでください。 接続に USE CONNECTION または ALL PRIVILEGES が必要です。

Salesforce 接続を作成するには、次のようにします。

  1. Azure Databricks ワークスペースで、[カタログ] > [外部の場所] > [接続] > [接続の作成] をクリックします。

  2. [接続名] で、Salesforce 接続の一意の名前を指定します。

  3. [接続の種類] で、[Salesforce] をクリックします。

  4. Salesforce サンドボックス アカウントから取り込む場合は、[サンドボックスである]true に設定します。

  5. [Salesforce でログイン] をクリックします。

    Salesforce のログイン

  6. Salesforce サンドボックスから取り込む場合は、[ カスタム ドメインの使用] をクリックします。 サンドボックス URL を指定し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。

    [カスタム ドメインの使用] ボタン

    サンドボックスの URL を入力する

  7. [接続の作成] ページに戻って、[作成] をクリックします。

インジェスト パイプラインを作成する

必要なアクセス許可:接続に対するUSE CONNECTION または ALL PRIVILEGES

このステップでは、インジェスト パイプラインを作成する方法について説明します。 取り込まれる各テーブルは、明示的に名前を変更しない限り、既定で、取り込み先の同じ名前 (ただしすべて小文字) を持つストリーミング テーブルに対応します。

Databricks UI

  1. Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。

  2. [ データの追加 ] ページの [ Databricks コネクタで、[ Salesforce] をクリックします。

    Salesforce インジェスト ウィザードが開きます。

  3. ウィザードの Pipeline ページで、インジェスト パイプラインの一意の名前を入力します。

  4. Destination カタログドロップダウンで、カタログを選択します。 取り込まれたデータとイベント ログがこのカタログに書き込まれます。

  5. Salesforce データへのアクセスに必要な資格情報を格納する Unity カタログ接続を選択します。

    Salesforce 接続がない場合は、[接続 作成] をクリックします。 メタストアに対する CREATE CONNECTION 特権が必要です。

  6. [パイプラインの作成] をクリックして続行します

  7. Source ページで、Databricks に取り込む Salesforce テーブルを選択し、[次へ] をクリック

    スキーマを選択すると、Salesforce インジェスト コネクタによって、ソース スキーマ内のすべての既存のテーブルと将来のテーブルが Unity カタログのマネージド テーブルに書き込まれます。

  8. [ Destination ページで、書き込む Unity カタログカタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する USE CATALOG および CREATE SCHEMA 特権が必要です。

  9. [パイプラインの保存] をクリックして続行

  10. [ Settings ページで、[スケジュールの作成] クリック。 変換先テーブルを更新する頻度を設定します。

  11. 必要に応じて、パイプライン操作の成功または失敗に関する電子メール通知を設定します。

  12. [ 保存してパイプラインを実行] をクリックします。

Databricks アセット バンドル

このタブでは、Databricks Asset Bundles (DAB) を使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳細については、「 Databricks アセット バンドル」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    databricks bundle init
    
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/sfdc_pipeline.yml)。
    • データ インジェストの頻度を制御するワークフロー ファイル (resources/sfdc_job.yml)。

    resources/sfdc_pipeline.yml ファイルの例を以下に示します。

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    resources/sfdc_job.yml ファイルの例を以下に示します。

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Databricks CLI を使用してパイプラインをデプロイします。

    databricks bundle deploy
    

Databricks CLI

パイプライン

databricks pipelines create --json "<pipeline-definition | json-file-path>"

パイプラインを更新するには:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

パイプライン定義を取得するには:

databricks pipelines get "<pipeline-id>"

パイプラインを削除するには:

databricks pipelines delete "<pipeline-id>"

詳細が必要な場合は、以下を実行できます。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

パイプラインの開始、スケジュール、アラート設定を行う

  1. パイプラインが作成されたら、Databricks ワークスペースに戻った後、[Delta Live Tables] をクリックします。

    新しいパイプラインがパイプライン一覧内に表示されます。

  2. パイプラインの詳細を表示するには、そのパイプライン名をクリックします。

  3. パイプラインの詳細ページで、[開始] をクリックしてパイプラインを実行します。 パイプラインをスケジュールするには [スケジュール] をクリックします。

  4. パイプライン上にアラートを設定するには、[スケジュール] をクリックし、[その他のオプション] をクリックしてから、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。

Note

パイプラインを実行すると、指定したテーブルに対する 2 つのソース ビューが表示されることがあります。 1 つのビューには、数式フィールドのスナップショットが含まれています。 もう 1 つのビューには、数式以外のフィールドの増分データ プルが含まれています。 これらのビューは、宛先テーブルに結合されます。