Salesforce からデータを取り込む
重要
LakeFlow Connect は限定的なパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、LakeFlow Connect を使って Salesforce からデータを取り込み、それを Azure Databricks に読み込む方法について説明します。 最終的なインジェスト パイプラインは Unity Catalog によって管理され、サーバーレス コンピューティングと Delta Live Tables を利用します。
Salesforce インジェスト コネクタでは、次のソースがサポートされています:
- Salesforce Sales クラウド
開始する前に
インジェスト パイプラインを作成するには、以下の要件を満たす必要があります。
ワークスペースが Unity Catalog に対して有効になっている。
サーバーレス コンピューティングがノートブック、ワークフロー、Delta Live Tables に対して有効になっている。 「サーバーレス コンピューティングを有効にする」をご覧ください。
接続を作成するには: メタストアに対する
CREATE CONNECTION
があります。既存の接続を使用するには: 接続オブジェクトに対する
USE CONNECTION
またはALL PRIVILEGES
があります。ターゲット カタログに対する
USE CATALOG
。既存のスキーマに対する
USE SCHEMA
およびCREATE TABLE
、またはターゲット カタログに対するCREATE SCHEMA
。(推奨)Databricks がデータの取得に使用できる Salesforce ユーザーを作成します。 そのユーザーには、API アクセス権と、取り込む予定のすべてのオブジェクトに対するアクセス権を持たせます。
Salesforce 接続を作成する
必要なアクセス許可: メタストアに対する CREATE CONNECTION
。 これを許可するには、メタストア管理者に問い合わせてください。
既存の接続を使用してインジェスト パイプラインを作成する場合は、次のセクションに進んでください。 接続に USE CONNECTION
または ALL PRIVILEGES
が必要です。
Salesforce 接続を作成するには、次のようにします。
Azure Databricks ワークスペースで、[カタログ] > [外部の場所] > [接続] > [接続の作成] をクリックします。
[接続名] で、Salesforce 接続の一意の名前を指定します。
[接続の種類] で、[Salesforce] をクリックします。
Salesforce サンドボックス アカウントから取り込む場合は、[サンドボックスである] を
true
に設定します。[Salesforce でログイン] をクリックします。
Salesforce サンドボックスから取り込む場合は、[ カスタム ドメインの使用] をクリックします。 サンドボックス URL を指定し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。
[接続の作成] ページに戻って、[作成] をクリックします。
インジェスト パイプラインを作成する
必要なアクセス許可:接続に対するUSE CONNECTION
または ALL PRIVILEGES
。
このステップでは、インジェスト パイプラインを作成する方法について説明します。 取り込まれる各テーブルは、明示的に名前を変更しない限り、既定で、取り込み先の同じ名前 (ただしすべて小文字) を持つストリーミング テーブルに対応します。
Databricks UI
Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。
[ データの追加 ] ページの [ Databricks コネクタで、[ Salesforce] をクリックします。
Salesforce インジェスト ウィザードが開きます。
ウィザードの Pipeline ページで、インジェスト パイプラインの一意の名前を入力します。
Destination カタログドロップダウンで、カタログを選択します。 取り込まれたデータとイベント ログがこのカタログに書き込まれます。
Salesforce データへのアクセスに必要な資格情報を格納する Unity カタログ接続を選択します。
Salesforce 接続がない場合は、[接続 作成] をクリックします。 メタストアに対する
CREATE CONNECTION
特権が必要です。[パイプラインの作成] をクリックして続行します。
Source ページで、Databricks に取り込む Salesforce テーブルを選択し、[次へ] をクリック。
スキーマを選択すると、Salesforce インジェスト コネクタによって、ソース スキーマ内のすべての既存のテーブルと将来のテーブルが Unity カタログのマネージド テーブルに書き込まれます。
[ Destination ページで、書き込む Unity カタログカタログとスキーマを選択します。
既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する
USE CATALOG
およびCREATE SCHEMA
特権が必要です。[パイプラインの保存] をクリックして続行。
[ Settings ページで、[スケジュールの作成] クリック。 変換先テーブルを更新する頻度を設定します。
必要に応じて、パイプライン操作の成功または失敗に関する電子メール通知を設定します。
[ 保存してパイプラインを実行] をクリックします。
Databricks アセット バンドル
このタブでは、Databricks Asset Bundles (DAB) を使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳細については、「 Databricks アセット バンドル」を参照してください。
Databricks CLI を使用して新しいバンドルを作成します。
databricks bundle init
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/sfdc_pipeline.yml
)。 - データ インジェストの頻度を制御するワークフロー ファイル (
resources/sfdc_job.yml
)。
resources/sfdc_pipeline.yml
ファイルの例を以下に示します。variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} channel: "preview"
resources/sfdc_job.yml
ファイルの例を以下に示します。resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- パイプライン定義ファイル (
Databricks CLI を使用してパイプラインをデプロイします。
databricks bundle deploy
Databricks CLI
パイプライン
databricks pipelines create --json "<pipeline-definition | json-file-path>"
パイプラインを更新するには:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
パイプライン定義を取得するには:
databricks pipelines get "<pipeline-id>"
パイプラインを削除するには:
databricks pipelines delete "<pipeline-id>"
詳細が必要な場合は、以下を実行できます。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
パイプラインの開始、スケジュール、アラート設定を行う
パイプラインが作成されたら、Databricks ワークスペースに戻った後、[Delta Live Tables] をクリックします。
新しいパイプラインがパイプライン一覧内に表示されます。
パイプラインの詳細を表示するには、そのパイプライン名をクリックします。
パイプラインの詳細ページで、[開始] をクリックしてパイプラインを実行します。 パイプラインをスケジュールするには [スケジュール] をクリックします。
パイプライン上にアラートを設定するには、[スケジュール] をクリックし、[その他のオプション] をクリックしてから、通知を追加します。
インジェストが完了したら、テーブルに対してクエリを実行できます。
Note
パイプラインを実行すると、指定したテーブルに対する 2 つのソース ビューが表示されることがあります。 1 つのビューには、数式フィールドのスナップショットが含まれています。 もう 1 つのビューには、数式以外のフィールドの増分データ プルが含まれています。 これらのビューは、宛先テーブルに結合されます。